Skip to content

Commit ca01670

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-1084 - Add support for XPENDING.
Original pull request: spring-projects#512.
1 parent 18d4ea1 commit ca01670

21 files changed

+1771
-47
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.springframework.data.redis.connection.stream.ByteRecord;
3838
import org.springframework.data.redis.connection.stream.Consumer;
3939
import org.springframework.data.redis.connection.stream.MapRecord;
40+
import org.springframework.data.redis.connection.stream.PendingMessages;
41+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
4042
import org.springframework.data.redis.connection.stream.ReadOffset;
4143
import org.springframework.data.redis.connection.stream.RecordId;
4244
import org.springframework.data.redis.connection.stream.StreamOffset;
@@ -3694,6 +3696,44 @@ public Long xLen(String key) {
36943696
return convertAndReturn(delegate.xLen(serialize(key)), identityConverter);
36953697
}
36963698

3699+
/*
3700+
* (non-Javadoc)
3701+
* @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String)
3702+
*/
3703+
@Override
3704+
public PendingMessagesSummary xPending(String key, String groupName) {
3705+
return convertAndReturn(delegate.xPending(serialize(key), groupName), identityConverter);
3706+
}
3707+
3708+
/*
3709+
* (non-Javadoc)
3710+
* @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long)
3711+
*/
3712+
@Override
3713+
public PendingMessages xPending(String key, String groupName, String consumer,
3714+
org.springframework.data.domain.Range<String> range, Long count) {
3715+
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count), identityConverter);
3716+
}
3717+
3718+
/*
3719+
* (non-Javadoc)
3720+
* @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long)
3721+
*/
3722+
@Override
3723+
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
3724+
Long count) {
3725+
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), identityConverter);
3726+
}
3727+
3728+
/*
3729+
* (non-Javadoc)
3730+
* @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions)
3731+
*/
3732+
@Override
3733+
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
3734+
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), identityConverter);
3735+
}
3736+
36973737
/*
36983738
* (non-Javadoc)
36993739
* @see org.springframework.data.redis.connection.StringRedisConnection#xRange(java.lang.String, org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit)
@@ -3719,7 +3759,7 @@ public List<StringRecord> xReadAsString(StreamReadOptions readOptions, StreamOff
37193759
*/
37203760
@Override
37213761
public List<StringRecord> xReadGroupAsString(Consumer consumer, StreamReadOptions readOptions,
3722-
StreamOffset<String>... streams) {
3762+
StreamOffset<String>... streams) {
37233763

37243764
return convertAndReturn(delegate.xReadGroup(consumer, readOptions, serialize(streams)),
37253765
listByteMapRecordToStringMapRecordConverter);
@@ -3808,6 +3848,24 @@ public Long xLen(byte[] key) {
38083848
return delegate.xLen(key);
38093849
}
38103850

3851+
/*
3852+
* (non-Javadoc)
3853+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String)
3854+
*/
3855+
@Override
3856+
public PendingMessagesSummary xPending(byte[] key, String groupName) {
3857+
return delegate.xPending(key, groupName);
3858+
}
3859+
3860+
/*
3861+
* (non-Javadoc)
3862+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String)
3863+
*/
3864+
@Override
3865+
public PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) {
3866+
return delegate.xPending(key, groupName, options);
3867+
}
3868+
38113869
/*
38123870
* (non-Javadoc)
38133871
* @see org.springframework.data.redis.connection.RedisStreamCommands#xRange(byte[], org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit)
@@ -3832,7 +3890,7 @@ public List<ByteRecord> xRead(StreamReadOptions readOptions, StreamOffset<byte[]
38323890
*/
38333891
@Override
38343892
public List<ByteRecord> xReadGroup(Consumer consumer, StreamReadOptions readOptions,
3835-
StreamOffset<byte[]>... streams) {
3893+
StreamOffset<byte[]>... streams) {
38363894
return delegate.xReadGroup(consumer, readOptions, streams);
38373895
}
38383896

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.springframework.data.redis.connection.stream.ByteRecord;
3232
import org.springframework.data.redis.connection.stream.Consumer;
3333
import org.springframework.data.redis.connection.stream.MapRecord;
34+
import org.springframework.data.redis.connection.stream.PendingMessages;
35+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3436
import org.springframework.data.redis.connection.stream.ReadOffset;
3537
import org.springframework.data.redis.connection.stream.RecordId;
3638
import org.springframework.data.redis.connection.stream.StreamOffset;
@@ -486,6 +488,20 @@ default Long xLen(byte[] key) {
486488
return streamCommands().xLen(key);
487489
}
488490

491+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
492+
@Override
493+
@Deprecated
494+
default PendingMessagesSummary xPending(byte[] key, String groupName) {
495+
return streamCommands().xPending(key, groupName);
496+
}
497+
498+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
499+
@Override
500+
@Deprecated
501+
default PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) {
502+
return streamCommands().xPending(key, groupName, options);
503+
}
504+
489505
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
490506
@Override
491507
@Deprecated
@@ -525,7 +541,7 @@ default List<ByteRecord> xReadGroup(Consumer consumer, StreamOffset<byte[]>... s
525541
@Override
526542
@Deprecated
527543
default List<ByteRecord> xReadGroup(Consumer consumer, StreamReadOptions readOptions,
528-
StreamOffset<byte[]>... streams) {
544+
StreamOffset<byte[]>... streams) {
529545
return streamCommands().xReadGroup(consumer, readOptions, streams);
530546
}
531547

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,21 @@
3030
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
3131
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3232
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
33+
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
3334
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
3435
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3536
import org.springframework.data.redis.connection.stream.Consumer;
37+
import org.springframework.data.redis.connection.stream.PendingMessage;
38+
import org.springframework.data.redis.connection.stream.PendingMessages;
39+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3640
import org.springframework.data.redis.connection.stream.ReadOffset;
3741
import org.springframework.data.redis.connection.stream.RecordId;
3842
import org.springframework.data.redis.connection.stream.StreamOffset;
3943
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4044
import org.springframework.data.redis.connection.stream.StreamRecords;
4145
import org.springframework.lang.Nullable;
4246
import org.springframework.util.Assert;
47+
import org.springframework.util.StringUtils;
4348

4449
/**
4550
* Stream-specific Redis commands executed using reactive infrastructure.
@@ -410,6 +415,225 @@ default Mono<Long> xLen(ByteBuffer key) {
410415
*/
411416
Flux<NumericResponse<KeyCommand, Long>> xLen(Publisher<KeyCommand> commands);
412417

418+
/**
419+
* Obtain the {@link PendingMessagesSummary} for a given {@literal consumer group}.
420+
*
421+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
422+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
423+
* @return {@link Mono} emitting a summary of pending messages within the given {@literal consumer group}.
424+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
425+
* @since 2.3
426+
*/
427+
default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName) {
428+
429+
Assert.notNull(key, "Key must not be null!");
430+
Assert.notNull(groupName, "GroupName must not be null!");
431+
432+
return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
433+
.map(CommandResponse::getOutput);
434+
}
435+
436+
/**
437+
* Obtain the {@link PendingMessagesSummary} for a given {@literal consumer group}.
438+
*
439+
* @param commands must not be {@literal null}..
440+
* @return {@link Flux} emitting a summary of pending messages within the given {@literal consumer group} one by one.
441+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
442+
* @since 2.3
443+
*/
444+
Flux<CommandResponse<PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(
445+
Publisher<PendingRecordsCommand> commands);
446+
447+
/**
448+
* Obtained detailed information about all pending messages for a given {@link Consumer}.
449+
*
450+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
451+
* @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
452+
* @return {@link Mono} emitting pending messages for the given {@link Consumer}.
453+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
454+
* @since 2.3
455+
*/
456+
@Nullable
457+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
458+
return xPending(key, consumer.getGroup(), consumer.getName());
459+
}
460+
461+
/**
462+
* Obtained detailed information about all pending messages for a given {@literal consumer}.
463+
*
464+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
465+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
466+
* @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
467+
* @return {@link Mono} emitting pending messages for the given {@link Consumer}.
468+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
469+
* @since 2.3
470+
*/
471+
@Nullable
472+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
473+
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
474+
.map(CommandResponse::getOutput);
475+
}
476+
477+
/**
478+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
479+
* {@literal consumer group}.
480+
*
481+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
482+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
483+
* @param range the range of messages ids to search within. Must not be {@literal null}.
484+
* @param count limit the number of results. Must not be {@literal null}.
485+
* @return {@link Mono} emitting pending messages for the given {@literal consumer group}. transaction.
486+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
487+
* @since 2.3
488+
*/
489+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
490+
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
491+
.map(CommandResponse::getOutput);
492+
}
493+
494+
/**
495+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
496+
* {@link Consumer} within a {@literal consumer group}.
497+
*
498+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
499+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
500+
* @param range the range of messages ids to search within. Must not be {@literal null}.
501+
* @param count limit the number of results. Must not be {@literal null}.
502+
* @return {@link Mono} emitting pending messages for the given {@link Consumer}.
503+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
504+
* @since 2.3
505+
*/
506+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count) {
507+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
508+
}
509+
510+
/**
511+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
512+
* {@literal consumer} within a {@literal consumer group}.
513+
*
514+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
515+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
516+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
517+
* @param range the range of messages ids to search within. Must not be {@literal null}.
518+
* @param count limit the number of results. Must not be {@literal null}.
519+
* @return {@link Mono} emitting pending messages for the given {@literal consumer} in given
520+
* {@literal consumer group}.
521+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
522+
* @since 2.3
523+
*/
524+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
525+
Long count) {
526+
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
527+
.map(CommandResponse::getOutput);
528+
}
529+
530+
/**
531+
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
532+
* options}.
533+
*
534+
* @param commands must not be {@literal null}.
535+
* @return {@link Flux} emitting pending messages matching given criteria.
536+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
537+
* @since 2.3
538+
*/
539+
Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(Publisher<PendingRecordsCommand> commands);
540+
541+
/**
542+
* Value Object holding parameters for obtaining pending messages.
543+
*
544+
* @author Christoph Strobl
545+
* @since 2.3
546+
*/
547+
class PendingRecordsCommand extends KeyCommand {
548+
549+
private final String groupName;
550+
private final @Nullable String consumerName;
551+
private final Range<?> range;
552+
private final @Nullable Long count;
553+
554+
private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
555+
@Nullable Long count) {
556+
557+
super(key);
558+
559+
this.groupName = groupName;
560+
this.consumerName = consumerName;
561+
this.range = range;
562+
this.count = count;
563+
}
564+
565+
/**
566+
* Create new unbounded {@link PendingRecordsCommand}.
567+
*
568+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
569+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
570+
* @return new instance of {@link PendingRecordsCommand}.
571+
*/
572+
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
573+
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
574+
}
575+
576+
/**
577+
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
578+
*
579+
* @return new instance of {@link XPendingOptions}.
580+
*/
581+
public PendingRecordsCommand range(Range<String> range, Long count) {
582+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
583+
}
584+
585+
/**
586+
* Append given consumer.
587+
*
588+
* @param consumerName must not be {@literal null}.
589+
* @return new instance of {@link PendingRecordsCommand}.
590+
*/
591+
public PendingRecordsCommand consumer(String consumerName) {
592+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
593+
}
594+
595+
public String getGroupName() {
596+
return groupName;
597+
}
598+
599+
/**
600+
* @return can be {@literal null}.
601+
*/
602+
@Nullable
603+
public String getConsumerName() {
604+
return consumerName;
605+
}
606+
607+
/**
608+
* @return never {@literal null}.
609+
*/
610+
public Range<?> getRange() {
611+
return range;
612+
}
613+
614+
/**
615+
* @return can be {@literal null}.
616+
*/
617+
@Nullable
618+
public Long getCount() {
619+
return count;
620+
}
621+
622+
/**
623+
* @return {@literal true} if a consumer name is present.
624+
*/
625+
public boolean hasConsumer() {
626+
return StringUtils.hasText(consumerName);
627+
}
628+
629+
/**
630+
* @return {@literal true} count is set.
631+
*/
632+
public boolean isLimited() {
633+
return count != null && count > -1;
634+
}
635+
}
636+
413637
/**
414638
* {@code XRANGE}/{@code XREVRANGE} command parameters.
415639
*

0 commit comments

Comments
 (0)