Skip to content

Commit 87d3bb5

Browse files
committed
DATAREDIS-1084 - Polishing.
Rename PendingMessage.getStringId to PendingMessage.getIdAsString for consistent naming scheme. Remove getElapsedTimeSinceLastDeliveryMS method for now in favor of getElapsedTimeSinceLastDelivery. Use primitive long instead of boxed wrapper to avoid nullability where possible. Move PendingMessage* converters to StreamConverters. Add assertions. Original pull request: spring-projects#512.
1 parent 52fd648 commit 87d3bb5

15 files changed

+214
-194
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ default Flux<ByteBufferRecord> xClaim(ByteBuffer key, String group, String newOw
353353

354354
/**
355355
* Change the ownership of a pending message to the given new {@literal consumer}.
356-
*
356+
*
357357
* @param commands must not be {@literal null}.
358358
* @return
359359
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
@@ -365,27 +365,28 @@ default Flux<ByteBufferRecord> xClaim(ByteBuffer key, String group, String newOw
365365
* {@code XCLAIM} command parameters.
366366
*
367367
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
368+
* @since 2.3
368369
*/
369370
class XClaimCommand extends KeyCommand {
370371

371372
private final String groupName;
372-
private final String consumerName;
373+
private final String newOwner;
373374
private final XClaimOptions options;
374375

375-
private XClaimCommand(@Nullable ByteBuffer key, String groupName, String consumerName, XClaimOptions options) {
376+
private XClaimCommand(@Nullable ByteBuffer key, String groupName, String newOwner, XClaimOptions options) {
376377

377378
super(key);
378379
this.groupName = groupName;
379-
this.consumerName = consumerName;
380+
this.newOwner = newOwner;
380381
this.options = options;
381382
}
382383

383384
public XClaimOptions getOptions() {
384385
return options;
385386
}
386387

387-
public String getConsumerName() {
388-
return consumerName;
388+
public String getNewOwner() {
389+
return newOwner;
389390
}
390391

391392
public String getGroupName() {
@@ -672,7 +673,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
672673

673674
/**
674675
* Create new unbounded {@link PendingRecordsCommand}.
675-
*
676+
*
676677
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
677678
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
678679
* @return new instance of {@link PendingRecordsCommand}.

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
2828
import org.springframework.data.redis.connection.stream.*;
2929
import org.springframework.lang.Nullable;
30+
import org.springframework.util.Assert;
3031
import org.springframework.util.StringUtils;
3132

3233
/**
@@ -121,7 +122,7 @@ default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, Durat
121122

122123
/**
123124
* Change the ownership of a pending message to the given new {@literal consumer}.
124-
*
125+
*
125126
* @param key the {@literal key} the stream is stored at.
126127
* @param group the name of the {@literal consumer group}.
127128
* @param newOwner the name of the new {@literal consumer}.
@@ -145,8 +146,8 @@ class XClaimOptions {
145146
private final @Nullable Long retryCount;
146147
private final boolean force;
147148

148-
private XClaimOptions(List<RecordId> ids, Duration minIdleTime, Duration idleTime, Instant unixTime,
149-
Long retryCount, boolean force) {
149+
private XClaimOptions(List<RecordId> ids, Duration minIdleTime, @Nullable Duration idleTime,
150+
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force) {
150151

151152
this.ids = new ArrayList<>(ids);
152153
this.minIdleTime = minIdleTime;
@@ -159,7 +160,7 @@ private XClaimOptions(List<RecordId> ids, Duration minIdleTime, Duration idleTim
159160
/**
160161
* Set the {@literal min-idle-time} to limit the command to messages that have been idle for at at least the given
161162
* {@link Duration}.
162-
*
163+
*
163164
* @param minIdleTime must not be {@literal null}.
164165
* @return new instance of {@link XClaimOptions}.
165166
*/
@@ -206,7 +207,7 @@ public XClaimOptions time(Instant unixTime) {
206207
* @param retryCount can be {@literal null}. If {@literal null} no change to the retry counter will be made.
207208
* @return new instance of {@link XClaimOptions}.
208209
*/
209-
public XClaimOptions retryCount(@Nullable Long retryCount) {
210+
public XClaimOptions retryCount(long retryCount) {
210211
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
211212
}
212213

@@ -249,7 +250,7 @@ public Duration getMinIdleTime() {
249250

250251
/**
251252
* Get the {@literal IDLE ms} time.
252-
*
253+
*
253254
* @return can be {@literal null}.
254255
*/
255256
@Nullable
@@ -259,7 +260,7 @@ public Duration getIdleTime() {
259260

260261
/**
261262
* Get the {@literal TIME ms-unix-time}
262-
*
263+
*
263264
* @return
264265
*/
265266
@Nullable
@@ -269,7 +270,7 @@ public Instant getUnixTime() {
269270

270271
/**
271272
* Get the {@literal RETRYCOUNT count}.
272-
*
273+
*
273274
* @return
274275
*/
275276
@Nullable
@@ -279,7 +280,7 @@ public Long getRetryCount() {
279280

280281
/**
281282
* Get the {@literal FORCE} flag.
282-
*
283+
*
283284
* @return
284285
*/
285286
public boolean isForce() {
@@ -288,9 +289,12 @@ public boolean isForce() {
288289

289290
public static class XClaimOptionsBuilder {
290291

291-
private Duration minIdleTime;
292+
private final Duration minIdleTime;
292293

293294
XClaimOptionsBuilder(Duration minIdleTime) {
295+
296+
Assert.notNull(minIdleTime, "Min idle time must not be null!");
297+
294298
this.minIdleTime = minIdleTime;
295299
}
296300

@@ -412,7 +416,7 @@ default Boolean xGroupDelConsumer(byte[] key, String groupName, String consumerN
412416

413417
/**
414418
* Obtain the {@link PendingMessagesSummary} for a given {@literal consumer group}.
415-
*
419+
*
416420
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
417421
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
418422
* @return a summary of pending messages within the given {@literal consumer group} or {@literal null} when used in
@@ -425,7 +429,7 @@ default Boolean xGroupDelConsumer(byte[] key, String groupName, String consumerN
425429

426430
/**
427431
* Obtained detailed information about all pending messages for a given {@link Consumer}.
428-
*
432+
*
429433
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
430434
* @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
431435
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
@@ -542,7 +546,7 @@ private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable
542546

543547
/**
544548
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
545-
*
549+
*
546550
* @return new instance of {@link XPendingOptions}.
547551
*/
548552
public static XPendingOptions unbounded() {
@@ -561,7 +565,7 @@ public static XPendingOptions unbounded(Long count) {
561565

562566
/**
563567
* Create new {@link XPendingOptions} with given {@link Range} and limit.
564-
*
568+
*
565569
* @return new instance of {@link XPendingOptions}.
566570
*/
567571
public static XPendingOptions range(Range<?> range, Long count) {
@@ -570,7 +574,7 @@ public static XPendingOptions range(Range<?> range, Long count) {
570574

571575
/**
572576
* Append given consumer.
573-
*
577+
*
574578
* @param consumerName must not be {@literal null}.
575579
* @return new instance of {@link XPendingOptions}.
576580
*/

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ abstract public class LettuceConverters extends Converters {
117117
private static final Converter<KeyValue<Object, Object>, Object> KEY_VALUE_UNWRAPPER;
118118
private static final ListConverter<KeyValue<Object, Object>, Object> KEY_VALUE_LIST_UNWRAPPER;
119119
private static final Converter<TransactionResult, List<Object>> TRANSACTION_RESULT_UNWRAPPER;
120-
private static final BiFunction<List<Object>, String, org.springframework.data.redis.connection.stream.PendingMessages> PENDING_MESSAGES_CONVERTER;
121-
private static final BiFunction<List<Object>, String, PendingMessagesSummary> PENDING_MESSAGES_SUMMARY_CONVERTER;
122120

123121
public static final byte[] PLUS_BYTES;
124122
public static final byte[] MINUS_BYTES;
@@ -320,69 +318,7 @@ private Set<Flag> parseFlags(Set<NodeFlag> source) {
320318

321319
TRANSACTION_RESULT_UNWRAPPER = transactionResult -> transactionResult.stream().collect(Collectors.toList());
322320

323-
PENDING_MESSAGES_CONVERTER = (source, groupName) -> {
324321

325-
List<Object> target = source.stream().map(LettuceConverters::preConvertNativeValues).collect(Collectors.toList());
326-
List<PendingMessage> pendingMessages = PendingParser.parseRange(target);
327-
328-
List<org.springframework.data.redis.connection.stream.PendingMessage> messages = pendingMessages.stream()
329-
.map(it -> {
330-
331-
RecordId id = RecordId.of(it.getId());
332-
Consumer consumer = Consumer.from(groupName, it.getConsumer());
333-
334-
return new org.springframework.data.redis.connection.stream.PendingMessage(id, consumer,
335-
Duration.ofMillis(it.getMsSinceLastDelivery()), it.getRedeliveryCount());
336-
337-
}).collect(Collectors.toList());
338-
339-
return new org.springframework.data.redis.connection.stream.PendingMessages(groupName, messages);
340-
341-
};
342-
343-
PENDING_MESSAGES_SUMMARY_CONVERTER = (source, groupName) -> {
344-
345-
List<Object> target = source.stream().map(LettuceConverters::preConvertNativeValues).collect(Collectors.toList());
346-
347-
PendingMessages pendingMessages = PendingParser.parse(target);
348-
org.springframework.data.domain.Range<String> range = org.springframework.data.domain.Range.open(
349-
pendingMessages.getMessageIds().getLower().getValue(), pendingMessages.getMessageIds().getUpper().getValue());
350-
351-
return new PendingMessagesSummary(groupName, pendingMessages.getCount(), range,
352-
pendingMessages.getConsumerMessageCount());
353-
};
354-
}
355-
356-
/**
357-
* We need to convert values into the correct target type since lettuce will give us {@link ByteBuffer} or arrays but
358-
* the parser requires us to have them as {@link String} or numeric values. Oh and {@literal null} values aren't real
359-
* good citizens as well, so we make them empty strings instead - see it works - somehow ;P
360-
*
361-
* @param value dont't get me started om this.
362-
* @return preconverted values that Lettuce parsers are able to understand \ö/.
363-
*/
364-
private static Object preConvertNativeValues(Object value) {
365-
366-
if (value instanceof ByteBuffer || value instanceof byte[]) {
367-
368-
byte[] targetArray = value instanceof ByteBuffer ? ByteUtils.getBytes((ByteBuffer) value) : (byte[]) value;
369-
String tmp = toString(targetArray);
370-
371-
try {
372-
return NumberUtils.parseNumber(tmp, Long.class);
373-
} catch (NumberFormatException e) {
374-
return tmp;
375-
}
376-
}
377-
if (value instanceof List) {
378-
List<Object> targetList = new ArrayList<>();
379-
for (Object it : (List) value) {
380-
targetList.add(preConvertNativeValues(it));
381-
}
382-
return targetList;
383-
}
384-
385-
return value != null ? value : "";
386322
}
387323

388324
public static List<Tuple> toTuple(List<byte[]> list) {
@@ -1117,32 +1053,6 @@ static long getUpperBoundIndex(org.springframework.data.domain.Range<Long> range
11171053
return getUpperBound(range).orElse(INDEXED_RANGE_END);
11181054
}
11191055

1120-
/**
1121-
* Convert the raw lettuce xpending result to {@link PendingMessages}.
1122-
*
1123-
* @param groupName the group name
1124-
* @param range the range of messages requested
1125-
* @param source the raw lettuce response.
1126-
* @return
1127-
* @since 2.3
1128-
*/
1129-
static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName,
1130-
org.springframework.data.domain.Range<?> range, List<Object> source) {
1131-
return PENDING_MESSAGES_CONVERTER.apply(source, groupName).withinRange(range);
1132-
}
1133-
1134-
/**
1135-
* Convert the raw lettuce xpending result to {@link PendingMessagesSummary}.
1136-
*
1137-
* @param groupName
1138-
* @param source the raw lettuce response.
1139-
* @return
1140-
* @since 2.3
1141-
*/
1142-
static PendingMessagesSummary toPendingMessagesInfo(String groupName, List<Object> source) {
1143-
return PENDING_MESSAGES_SUMMARY_CONVERTER.apply(source, groupName);
1144-
}
1145-
11461056
/**
11471057
* @author Christoph Strobl
11481058
* @since 1.8

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publish
121121

122122
String[] ids = command.getOptions().getIdsAsStringArray();
123123
io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer
124-
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getConsumerName()));
124+
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
125125
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions());
126126

127127
Flux<RecordId> result = cmd.xclaim(command.getKey(), from, args, ids).map(it -> RecordId.of(it.getId()));
@@ -140,7 +140,7 @@ public Flux<CommandResponse<XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publi
140140

141141
String[] ids = command.getOptions().getIdsAsStringArray();
142142
io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer
143-
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getConsumerName()));
143+
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
144144
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions());
145145

146146
Flux<ByteBufferRecord> result = cmd.xclaim(command.getKey(), from, args, ids)
@@ -249,7 +249,7 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessagesSummary>> xPen
249249
// end.
250250
// end.
251251

252-
return LettuceConverters.toPendingMessagesInfo(command.getGroupName(), target);
252+
return StreamConverters.toPendingMessagesInfo(command.getGroupName(), target);
253253
}).map(value -> new CommandResponse<PendingRecordsCommand, PendingMessagesSummary>(command, value));
254254
}));
255255
}
@@ -268,15 +268,15 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
268268
ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
269269
io.lettuce.core.Range<String> range = RangeConverter.toRangeWithDefault(command.getRange(), "-", "+");
270270
io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount())
271-
: io.lettuce.core.Limit.from(Long.MAX_VALUE);
271+
: io.lettuce.core.Limit.unlimited();
272272

273273
Flux<Object> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(),
274274
io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit)
275275
: cmd.xpending(command.getKey(), groupName, range, limit);
276276

277277
return publisher.collectList().map(it -> {
278278

279-
return LettuceConverters.toPendingMessages(command.getGroupName(), command.getRange(), it);
279+
return StreamConverters.toPendingMessages(command.getGroupName(), command.getRange(), it);
280280
}).map(value -> new CommandResponse<PendingRecordsCommand, PendingMessages>(command, value));
281281
}));
282282
}

0 commit comments

Comments
 (0)