Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* @author ihaohong
* @author Dennis Neufeld
* @author Shyngys Sapraliyev
* @author Jeonggyu Choi
*/
@SuppressWarnings({ "ConstantConditions", "deprecation" })
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
Expand Down Expand Up @@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, String consumerName,
org.springframework.data.domain.Range<String> range, Long count, Duration idle) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, idle),
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count, Duration idle) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idle),
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
* @author Jeonggyu Choi
* @since 2.2
*/
public interface ReactiveStreamCommands {
Expand Down Expand Up @@ -747,6 +748,25 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
* {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
* transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count, Duration idle) {
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next()
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group}.
Expand All @@ -763,6 +783,23 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count, Duration idle) {
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group}.
Expand All @@ -783,6 +820,28 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
.next().map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
* when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count, Duration idle) {
return xPending(
Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).idle(idle)))
.next().map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
Expand All @@ -798,6 +857,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
* Value Object holding parameters for obtaining pending messages.
*
* @author Christoph Strobl
* @author Jeonggyu Choi
* @since 2.3
*/
class PendingRecordsCommand extends KeyCommand {
Expand All @@ -806,16 +866,18 @@ class PendingRecordsCommand extends KeyCommand {
private final @Nullable String consumerName;
private final Range<?> range;
private final @Nullable Long count;
private final @Nullable Duration idle;

private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
@Nullable Long count) {
@Nullable Long count, @Nullable Duration idle) {

super(key);

this.groupName = groupName;
this.consumerName = consumerName;
this.range = range;
this.count = count;
this.idle = idle;
}

/**
Expand All @@ -826,7 +888,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
* @return new instance of {@link PendingRecordsCommand}.
*/
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
}

/**
Expand All @@ -841,7 +903,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null);
}

/**
Expand All @@ -851,7 +913,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand consumer(String consumerName) {
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
}

/**
* Append given idle time.
*
* @param idle must not be {@literal null}.
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand idle(Duration idle) {

Assert.notNull(idle, "Idle must not be null");

return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
}

public String getGroupName() {
Expand Down Expand Up @@ -881,6 +956,14 @@ public Long getCount() {
return count;
}

/**
* @return can be {@literal null}.
*/
@Nullable
public Duration getIdle() {
return idle;
}

/**
* @return {@literal true} if a consumer name is present.
*/
Expand All @@ -894,6 +977,13 @@ public boolean hasConsumer() {
public boolean isLimited() {
return count != null;
}

/**
* @return {@literal true} if idle is set.
*/
public boolean hasIdle() {
return idle != null;
}
}

/**
Expand Down
Loading