Skip to content

Commit c010472

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-1119 - Support XINFO via RedisStreamCommands.
Original pull request: spring-projects#519.
1 parent 0d2e447 commit c010472

File tree

10 files changed

+1078
-2
lines changed

10 files changed

+1078
-2
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
4242
import org.springframework.data.redis.connection.stream.ReadOffset;
4343
import org.springframework.data.redis.connection.stream.RecordId;
44+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
45+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
46+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
4447
import org.springframework.data.redis.connection.stream.StreamOffset;
4548
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4649
import org.springframework.data.redis.connection.stream.StringRecord;
@@ -3666,7 +3669,8 @@ public List<RecordId> xClaimJustId(String key, String group, String consumer, XC
36663669
*/
36673670
@Override
36683671
public List<StringRecord> xClaim(String key, String group, String consumer, XClaimOptions options) {
3669-
return convertAndReturn(delegate.xClaim(serialize(key), group, consumer, options), listByteMapRecordToStringMapRecordConverter);
3672+
return convertAndReturn(delegate.xClaim(serialize(key), group, consumer, options),
3673+
listByteMapRecordToStringMapRecordConverter);
36703674
}
36713675

36723676
/*
@@ -3705,6 +3709,33 @@ public Boolean xGroupDestroy(String key, String group) {
37053709
return convertAndReturn(delegate.xGroupDestroy(serialize(key), group), identityConverter);
37063710
}
37073711

3712+
/*
3713+
* (non-Javadoc)
3714+
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfo(java.lang.String)
3715+
*/
3716+
@Override
3717+
public XInfoStream xInfo(String key) {
3718+
return convertAndReturn(delegate.xInfo(serialize(key)), identityConverter);
3719+
}
3720+
3721+
/*
3722+
* (non-Javadoc)
3723+
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfoGroups(java.lang.String)
3724+
*/
3725+
@Override
3726+
public XInfoGroups xInfoGroups(String key) {
3727+
return convertAndReturn(delegate.xInfoGroups(serialize(key)), identityConverter);
3728+
}
3729+
3730+
/*
3731+
* (non-Javadoc)
3732+
* @see org.springframework.data.redis.connection.StringRedisConnection#xInfoConsumers(java.lang.String, java.lang.String)
3733+
*/
3734+
@Override
3735+
public XInfoConsumers xInfoConsumers(String key, String groupName) {
3736+
return convertAndReturn(delegate.xInfoConsumers(serialize(key), groupName), identityConverter);
3737+
}
3738+
37083739
/*
37093740
* (non-Javadoc)
37103741
* @see org.springframework.data.redis.connection.StringRedisConnection#xLen(java.lang.String)
@@ -3875,6 +3906,33 @@ public Boolean xGroupDestroy(byte[] key, String groupName) {
38753906
return delegate.xGroupDestroy(key, groupName);
38763907
}
38773908

3909+
/*
3910+
* (non-Javadoc)
3911+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfo(byte[])
3912+
*/
3913+
@Override
3914+
public XInfoStream xInfo(byte[] key) {
3915+
return delegate.xInfo(key);
3916+
}
3917+
3918+
/*
3919+
* (non-Javadoc)
3920+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoGroups(byte[])
3921+
*/
3922+
@Override
3923+
public XInfoGroups xInfoGroups(byte[] key) {
3924+
return delegate.xInfoGroups(key);
3925+
}
3926+
3927+
/*
3928+
* (non-Javadoc)
3929+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoConsumers(byte[], java.lang.String)
3930+
*/
3931+
@Override
3932+
public XInfoConsumers xInfoConsumers(byte[] key, String groupName) {
3933+
return delegate.xInfoConsumers(key, groupName);
3934+
}
3935+
38783936
/*
38793937
* (non-Javadoc)
38803938
* @see org.springframework.data.redis.connection.RedisStreamCommands#xLen(byte[])

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3636
import org.springframework.data.redis.connection.stream.ReadOffset;
3737
import org.springframework.data.redis.connection.stream.RecordId;
38+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
39+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
40+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
3841
import org.springframework.data.redis.connection.stream.StreamOffset;
3942
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4043
import org.springframework.data.redis.core.Cursor;
@@ -495,6 +498,27 @@ default Boolean xGroupDestroy(byte[] key, String groupName) {
495498
return streamCommands().xGroupDestroy(key, groupName);
496499
}
497500

501+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
502+
@Override
503+
@Deprecated
504+
default XInfoStream xInfo(byte[] key) {
505+
return streamCommands().xInfo(key);
506+
}
507+
508+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
509+
@Override
510+
@Deprecated
511+
default XInfoGroups xInfoGroups(byte[] key) {
512+
return streamCommands().xInfoGroups(key);
513+
}
514+
515+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
516+
@Override
517+
@Deprecated
518+
default XInfoConsumers xInfoConsumers(byte[] key, String groupName) {
519+
return streamCommands().xInfoConsumers(key, groupName);
520+
}
521+
498522
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
499523
@Override
500524
@Deprecated

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
2828
import org.springframework.data.redis.connection.stream.*;
29+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
30+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
31+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
2932
import org.springframework.lang.Nullable;
3033
import org.springframework.util.Assert;
3134
import org.springframework.util.ObjectUtils;
@@ -488,6 +491,39 @@ default Boolean xGroupDelConsumer(byte[] key, String groupName, String consumerN
488491
@Nullable
489492
Boolean xGroupDestroy(byte[] key, String groupName);
490493

494+
/**
495+
* Obtain general information about the stream stored at the specified {@literal key}.
496+
*
497+
* @param key the {@literal key} the stream is stored at.
498+
* @return {@literal null} when used in pipeline / transaction.
499+
* @since 2.3
500+
*/
501+
@Nullable
502+
XInfoStream xInfo(byte[] key);
503+
504+
/**
505+
* Obtain information about {@literal consumer groups} associated with the stream stored at the specified
506+
* {@literal key}.
507+
*
508+
* @param key the {@literal key} the stream is stored at.
509+
* @return {@literal null} when used in pipeline / transaction.
510+
* @since 2.3
511+
*/
512+
@Nullable
513+
XInfoGroups xInfoGroups(byte[] key);
514+
515+
/**
516+
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
517+
* specified {@literal key}.
518+
*
519+
* @param key the {@literal key} the stream is stored at.
520+
* @param groupName name of the {@literal consumer group}.
521+
* @return {@literal null} when used in pipeline / transaction.
522+
* @since 2.3
523+
*/
524+
@Nullable
525+
XInfoConsumers xInfoConsumers(byte[] key, String groupName);
526+
491527
/**
492528
* Get the length of a stream.
493529
*

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3535
import org.springframework.data.redis.connection.stream.ReadOffset;
3636
import org.springframework.data.redis.connection.stream.RecordId;
37+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
38+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
39+
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
3740
import org.springframework.data.redis.connection.stream.StreamOffset;
3841
import org.springframework.data.redis.connection.stream.StreamReadOptions;
3942
import org.springframework.data.redis.connection.stream.StreamRecords;
@@ -2128,6 +2131,39 @@ default Long xDel(String key, String... entryIds) {
21282131
@Nullable
21292132
Boolean xGroupDestroy(String key, String group);
21302133

2134+
/**
2135+
* Obtain general information about the stream stored at the specified {@literal key}.
2136+
*
2137+
* @param key the {@literal key} the stream is stored at.
2138+
* @return {@literal null} when used in pipeline / transaction.
2139+
* @since 2.3
2140+
*/
2141+
@Nullable
2142+
XInfoStream xInfo(String key);
2143+
2144+
/**
2145+
* Obtain information about {@literal consumer groups} associated with the stream stored at the specified
2146+
* {@literal key}.
2147+
*
2148+
* @param key the {@literal key} the stream is stored at.
2149+
* @return {@literal null} when used in pipeline / transaction.
2150+
* @since 2.3
2151+
*/
2152+
@Nullable
2153+
XInfoGroups xInfoGroups(String key);
2154+
2155+
/**
2156+
* Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
2157+
* specified {@literal key}.
2158+
*
2159+
* @param key the {@literal key} the stream is stored at.
2160+
* @param groupName name of the {@literal consumer group}.
2161+
* @return {@literal null} when used in pipeline / transaction.
2162+
* @since 2.3
2163+
*/
2164+
@Nullable
2165+
XInfoConsumers xInfoConsumers(String key, String groupName);
2166+
21312167
/**
21322168
* Get the length of a stream.
21332169
*

src/main/java/org/springframework/data/redis/connection/convert/Converters.java

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,22 @@
1717

1818
import lombok.RequiredArgsConstructor;
1919

20+
import java.nio.ByteBuffer;
2021
import java.time.Duration;
21-
import java.util.*;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
import java.util.LinkedHashMap;
27+
import java.util.LinkedHashSet;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Properties;
31+
import java.util.Set;
2232
import java.util.concurrent.TimeUnit;
2333

34+
import org.apache.commons.logging.Log;
35+
import org.apache.commons.logging.LogFactory;
2436
import org.springframework.core.convert.converter.Converter;
2537
import org.springframework.data.geo.Distance;
2638
import org.springframework.data.geo.GeoResult;
@@ -38,8 +50,10 @@
3850
import org.springframework.data.redis.connection.RedisNode.NodeType;
3951
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
4052
import org.springframework.data.redis.serializer.RedisSerializer;
53+
import org.springframework.data.redis.util.ByteUtils;
4154
import org.springframework.lang.Nullable;
4255
import org.springframework.util.Assert;
56+
import org.springframework.util.ClassUtils;
4357
import org.springframework.util.CollectionUtils;
4458
import org.springframework.util.NumberUtils;
4559
import org.springframework.util.ObjectUtils;
@@ -55,6 +69,9 @@
5569
*/
5670
abstract public class Converters {
5771

72+
// private static final Log LOGGER = LogFactory.getLog(Converters.class);
73+
private static final Log LOGGER = LogFactory.getLog(Converters.class);
74+
5875
private static final byte[] ONE = new byte[] { '1' };
5976
private static final byte[] ZERO = new byte[] { '0' };
6077
private static final String CLUSTER_NODES_LINE_SEPARATOR = "\n";
@@ -426,7 +443,103 @@ public static <K, V> Converter<Map<K, V>, Properties> mapToPropertiesConverter()
426443
@Nullable
427444
public static Duration secondsToDuration(@Nullable Long seconds) {
428445
return seconds != null ? Duration.ofSeconds(seconds) : null;
446+
}
447+
448+
/**
449+
* Parse a rather generic Redis response, such as a list of something into a meaningful structure applying best effort
450+
* conversion of {@code byte[]} and {@link ByteBuffer}.
451+
*
452+
* @param source the source to parse
453+
* @param targetType eg. {@link Map}, {@link String},...
454+
* @param <T>
455+
* @return
456+
* @since 2.3
457+
*/
458+
public static <T> T parse(Object source, Class<T> targetType) {
459+
return targetType.cast(parse(source, "root", Collections.singletonMap("root", targetType)));
460+
}
461+
462+
/**
463+
* Parse a rather generic Redis response, such as a list of something into a meaningful structure applying best effort
464+
* conversion of {@code byte[]} and {@link ByteBuffer} based on the {@literal sourcePath} and a {@literal typeHintMap}
465+
*
466+
* @param source the source to parse
467+
* @param sourcePath the current path (use "root", for level 0).
468+
* @param typeHintMap source path to target type hints allowing wildcards ({@literal *}).
469+
* @return
470+
* @since 2.3
471+
*/
472+
public static Object parse(Object source, String sourcePath, Map<String, Class<?>> typeHintMap) {
473+
474+
String path = sourcePath;
475+
Class<?> targetType = typeHintMap.get(path);
476+
477+
if (targetType == null) {
478+
479+
String alternatePath = sourcePath.contains(".") ? sourcePath.substring(0, sourcePath.lastIndexOf(".")) + ".*"
480+
: sourcePath;
481+
targetType = typeHintMap.get(alternatePath);
482+
if (targetType == null) {
483+
484+
if (sourcePath.endsWith("[]")) {
485+
targetType = String.class;
486+
} else {
487+
targetType = source.getClass();
488+
}
489+
490+
} else {
491+
if (targetType == Map.class && sourcePath.endsWith("[]")) {
492+
targetType = String.class;
493+
} else {
494+
path = alternatePath;
495+
}
496+
}
497+
}
498+
499+
if (LOGGER.isDebugEnabled()) {
500+
LOGGER.debug(String.format("parsing %s (%s) as %s.", sourcePath, path, targetType));
501+
}
502+
503+
if (targetType == Object.class) {
504+
return source;
505+
}
506+
507+
if (ClassUtils.isAssignable(String.class, targetType)) {
508+
509+
if (source instanceof String) {
510+
return source.toString();
511+
}
512+
if (source instanceof byte[]) {
513+
return new String((byte[]) source);
514+
}
515+
if (source instanceof ByteBuffer) {
516+
return new String(ByteUtils.getBytes((ByteBuffer) source));
517+
}
518+
}
519+
520+
if (ClassUtils.isAssignable(List.class, targetType) && source instanceof List) {
521+
522+
List<Object> sourceCollection = (List<Object>) source;
523+
List<Object> targetList = new ArrayList<>();
524+
for (int i = 0; i < sourceCollection.size(); i++) {
525+
targetList.add(parse(sourceCollection.get(i), sourcePath + ".[" + i + "]", typeHintMap));
526+
}
527+
return targetList;
528+
}
529+
530+
if (ClassUtils.isAssignable(Map.class, targetType) && source instanceof List) {
531+
532+
List<Object> sourceCollection = ((List<Object>) source);
533+
Map<String, Object> targetMap = new LinkedHashMap<>();
534+
for (int i = 0; i < sourceCollection.size(); i = i + 2) {
535+
536+
String key = parse(sourceCollection.get(i), path + ".[]", typeHintMap).toString();
537+
targetMap.put(key, parse(sourceCollection.get(i + 1), path + "." + key, typeHintMap));
538+
}
539+
return targetMap;
540+
}
429541

542+
return source;
430543
}
431544

432545
/**

0 commit comments

Comments
 (0)