Skip to content

Commit 34d1759

Browse files
christophstroblThomas Darimont
authored andcommitted
DATAREDIS-304 - Add support for SSCAN command.
SSCAN command is natively supported by jedis and can be emulated for lettuce. JRedis and SRP will throw UnsupportedOperationException. sScan is available on RedisConneciton and RedisSetOperations returning basically a Cursor that allows iteration over the defined values for a given key. Currently the jedis driver does not directly expose the binary version of sscan which leads to invalid results when using a non String compatible converter along with the RedisTemplate. We’ll change this as soon as a newer version of jedis is available. Along the way loading behavior of ScanCursor has been fixed, preventing it from indicating next values available when there effectively are no more. Orignal pull request: spring-projects#76.
1 parent 5f4e9db commit 34d1759

18 files changed

+530
-80
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.data.redis.connection.convert.ListConverter;
3333
import org.springframework.data.redis.connection.convert.MapConverter;
3434
import org.springframework.data.redis.connection.convert.SetConverter;
35+
import org.springframework.data.redis.core.ConvertingCursor;
3536
import org.springframework.data.redis.core.Cursor;
3637
import org.springframework.data.redis.core.ScanOptions;
3738
import org.springframework.data.redis.core.types.RedisClientInfo;
@@ -2226,6 +2227,15 @@ public Cursor<byte[]> scan(ScanOptions options) {
22262227
return this.delegate.scan(options);
22272228
}
22282229

2230+
/*
2231+
* (non-Javadoc)
2232+
* @see org.springframework.data.redis.connection.RedisSetCommands#scan(byte[], org.springframework.data.redis.core.ScanOptions)
2233+
*/
2234+
@Override
2235+
public Cursor<byte[]> sScan(byte[] key, ScanOptions options) {
2236+
return this.delegate.sScan(key, options);
2237+
}
2238+
22292239
/**
22302240
* Specifies if pipelined and tx results should be deserialized to Strings. If false, results of
22312241
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the underlying connection
@@ -2299,4 +2309,23 @@ public void killClient(String host, int port) {
22992309
public String getClientName() {
23002310
return this.delegate.getClientName();
23012311
}
2312+
2313+
/*
2314+
* (non-Javadoc)
2315+
* @see org.springframework.data.redis.connection.StringRedisConnection#sScan(java.lang.String, org.springframework.data.redis.core.ScanOptions)
2316+
*/
2317+
@Override
2318+
public Cursor<String> sScan(String key, ScanOptions options) {
2319+
2320+
return new ConvertingCursor<byte[], String>(this.delegate.sScan(this.serialize(key), options),
2321+
new Converter<byte[], String>() {
2322+
2323+
@Override
2324+
public String convert(byte[] source) {
2325+
return serializer.deserialize(source);
2326+
}
2327+
});
2328+
2329+
}
2330+
23022331
}

src/main/java/org/springframework/data/redis/connection/RedisSetCommands.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import java.util.List;
1919
import java.util.Set;
2020

21+
import org.springframework.data.redis.core.Cursor;
22+
import org.springframework.data.redis.core.ScanOptions;
23+
2124
/**
2225
* Set-specific commands supported by Redis.
2326
*
@@ -166,4 +169,15 @@ public interface RedisSetCommands {
166169
* @return
167170
*/
168171
List<byte[]> sRandMember(byte[] key, long count);
172+
173+
/**
174+
* Use a {@link Cursor} to iterate over elements in set at {@code key}.
175+
*
176+
* @since 1.4
177+
* @see http://redis.io/commands/scan
178+
* @param key
179+
* @param options
180+
* @return
181+
*/
182+
Cursor<byte[]> sScan(byte[] key, ScanOptions options);
169183
}

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import java.util.Map;
2121
import java.util.Set;
2222

23+
import org.springframework.data.redis.core.Cursor;
2324
import org.springframework.data.redis.core.RedisCallback;
25+
import org.springframework.data.redis.core.ScanOptions;
2426
import org.springframework.data.redis.core.StringRedisTemplate;
2527
import org.springframework.data.redis.core.types.RedisClientInfo;
2628
import org.springframework.data.redis.serializer.RedisSerializer;
@@ -316,4 +318,13 @@ public interface StringTuple extends Tuple {
316318
* @since 1.3
317319
*/
318320
List<RedisClientInfo> getClientList();
321+
322+
/**
323+
* @see RedisSetCommands#sScan(byte[], ScanOptions)
324+
* @param key
325+
* @param options
326+
* @return
327+
* @since 1.4
328+
*/
329+
Cursor<String> sScan(String key, ScanOptions options);
319330
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.data.redis.connection.convert.Converters;
4444
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
4545
import org.springframework.data.redis.core.Cursor;
46+
import org.springframework.data.redis.core.KeyBoundCursor;
4647
import org.springframework.data.redis.core.ScanCursor;
4748
import org.springframework.data.redis.core.ScanIteration;
4849
import org.springframework.data.redis.core.ScanOptions;
@@ -2918,6 +2919,12 @@ public Cursor<byte[]> scan(ScanOptions options) {
29182919
return scan(0, options != null ? options : ScanOptions.NONE);
29192920
}
29202921

2922+
/**
2923+
* @since 1.4
2924+
* @param cursorId
2925+
* @param options
2926+
* @return
2927+
*/
29212928
public Cursor<byte[]> scan(long cursorId, ScanOptions options) {
29222929

29232930
return new ScanCursor<byte[]>(cursorId, options) {
@@ -2929,22 +2936,65 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
29292936
throw new UnsupportedOperationException("'SCAN' cannot be called in pipeline / transaction mode.");
29302937
}
29312938

2932-
ScanParams sp = new ScanParams();
2933-
if (!options.equals(ScanOptions.NONE)) {
2934-
if (options.getCount() != null) {
2935-
sp.count(options.getCount().intValue());
2936-
}
2937-
if (StringUtils.hasText(options.getPattern())) {
2938-
sp.match(options.getPattern());
2939-
}
2939+
ScanParams params = prepareScanParams(options);
2940+
redis.clients.jedis.ScanResult<String> result = jedis.scan(Long.toString(cursorId), params);
2941+
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), JedisConverters.stringListToByteList()
2942+
.convert(result.getResult()));
2943+
}
2944+
2945+
}.open();
2946+
2947+
}
2948+
2949+
/*
2950+
* (non-Javadoc)
2951+
* @see org.springframework.data.redis.connection.RedisSetCommands#sScan(byte[], org.springframework.data.redis.core.ScanOptions)
2952+
*/
2953+
@Override
2954+
public Cursor<byte[]> sScan(byte[] key, ScanOptions options) {
2955+
return sScan(key, 0, options);
2956+
}
2957+
2958+
/**
2959+
* @since 1.4
2960+
* @param key
2961+
* @param cursorId
2962+
* @param options
2963+
* @return
2964+
*/
2965+
public Cursor<byte[]> sScan(byte[] key, long cursorId, ScanOptions options) {
2966+
2967+
return new KeyBoundCursor<byte[]>(key, cursorId, options) {
2968+
2969+
@Override
2970+
protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions options) {
2971+
2972+
if (isQueueing() || isPipelined()) {
2973+
throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode.");
29402974
}
29412975

2942-
redis.clients.jedis.ScanResult<String> result = jedis.scan(Long.toString(cursorId), sp);
2976+
ScanParams params = prepareScanParams(options);
2977+
2978+
// TODO: use binary version of jedis.sscan (in v.2.4.3) to avoid potentially invalid representations.
2979+
redis.clients.jedis.ScanResult<String> result = jedis.sscan(JedisConverters.toString(key),
2980+
Long.toString(cursorId), params);
29432981
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), JedisConverters.stringListToByteList()
29442982
.convert(result.getResult()));
29452983
}
29462984
}.open();
2985+
}
29472986

2987+
private ScanParams prepareScanParams(ScanOptions options) {
2988+
ScanParams sp = new ScanParams();
2989+
if (!options.equals(ScanOptions.NONE)) {
2990+
if (options.getCount() != null) {
2991+
sp.count(options.getCount().intValue());
2992+
}
2993+
if (StringUtils.hasText(options.getPattern())) {
2994+
sp.match(options.getPattern());
2995+
}
2996+
}
2997+
return sp;
29482998
}
29492999

29503000
/**

src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,4 +1247,13 @@ public void slaveOfNoOne() {
12471247
public Cursor<byte[]> scan(ScanOptions options) {
12481248
throw new UnsupportedOperationException("'SCAN' command is not supported for jredis.");
12491249
}
1250+
1251+
/*
1252+
* (non-Javadoc)
1253+
* @see org.springframework.data.redis.connection.RedisSetCommands#sScan(byte[], org.springframework.data.redis.core.ScanOptions)
1254+
*/
1255+
@Override
1256+
public Cursor<byte[]> sScan(byte[] key, ScanOptions options) {
1257+
throw new UnsupportedOperationException("'SSCAN' command is not uspported for jredis");
1258+
}
12501259
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.data.redis.connection.convert.Converters;
5151
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
5252
import org.springframework.data.redis.core.Cursor;
53+
import org.springframework.data.redis.core.KeyBoundCursor;
5354
import org.springframework.data.redis.core.RedisCommand;
5455
import org.springframework.data.redis.core.ScanCursor;
5556
import org.springframework.data.redis.core.ScanIteration;
@@ -3044,6 +3045,12 @@ public Cursor<byte[]> scan(ScanOptions options) {
30443045
return scan(0, options != null ? options : ScanOptions.NONE);
30453046
}
30463047

3048+
/**
3049+
* @since 1.4
3050+
* @param cursorId
3051+
* @param options
3052+
* @return
3053+
*/
30473054
public Cursor<byte[]> scan(long cursorId, ScanOptions options) {
30483055

30493056
return new ScanCursor<byte[]>(cursorId, options) {
@@ -3056,25 +3063,70 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
30563063
throw new UnsupportedOperationException("'SCAN' cannot be called in pipeline / transaction mode.");
30573064
}
30583065

3059-
String params = " ," + cursorId;
3060-
if (!options.equals(ScanOptions.NONE)) {
3061-
if (options.getCount() != null) {
3062-
params += (", 'count', " + options.getCount());
3063-
}
3064-
if (StringUtils.hasText(options.getPattern())) {
3065-
params += (", 'match' , '" + options.getPattern() + "'");
3066-
}
3066+
String params = " ," + cursorId + prepareScanParams(options);
3067+
String script = "return redis.call('SCAN'" + params + ")";
3068+
3069+
List<?> result = eval(script.getBytes(), ReturnType.MULTI, 0);
3070+
String nextCursorId = LettuceConverters.bytesToString().convert((byte[]) result.get(0));
3071+
3072+
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), ((ArrayList<byte[]>) result.get(1)));
3073+
}
3074+
}.open();
3075+
3076+
}
3077+
3078+
/*
3079+
* (non-Javadoc)
3080+
* @see org.springframework.data.redis.connection.RedisSetCommands#sScan(byte[], org.springframework.data.redis.core.ScanOptions)
3081+
*/
3082+
@Override
3083+
public Cursor<byte[]> sScan(byte[] key, ScanOptions options) {
3084+
return sScan(key, 0, options);
3085+
}
3086+
3087+
/**
3088+
* @since 1.4
3089+
* @param key
3090+
* @param cursorId
3091+
* @param options
3092+
* @return
3093+
*/
3094+
public Cursor<byte[]> sScan(byte[] key, long cursorId, ScanOptions options) {
3095+
3096+
return new KeyBoundCursor<byte[]>(key, cursorId, options) {
3097+
3098+
@SuppressWarnings("unchecked")
3099+
@Override
3100+
protected ScanIteration<byte[]> doScan(byte[] key, long cursorId, ScanOptions options) {
3101+
3102+
if (isQueueing() || isPipelined()) {
3103+
throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode.");
30673104
}
30683105

3069-
String script = "return redis.call('SCAN'" + params + ")";
3106+
String params = " ,'" + LettuceConverters.bytesToString().convert(key) + "', " + cursorId
3107+
+ prepareScanParams(options);
3108+
String script = "return redis.call('SSCAN'" + params + ")";
30703109

30713110
List<?> result = eval(script.getBytes(), ReturnType.MULTI, 0);
30723111
String nextCursorId = LettuceConverters.bytesToString().convert((byte[]) result.get(0));
30733112

30743113
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), ((ArrayList<byte[]>) result.get(1)));
30753114
}
30763115
}.open();
3116+
}
30773117

3118+
private String prepareScanParams(ScanOptions options) {
3119+
3120+
String params = "";
3121+
if (!options.equals(ScanOptions.NONE)) {
3122+
if (options.getCount() != null) {
3123+
params += (", 'count', " + options.getCount());
3124+
}
3125+
if (StringUtils.hasText(options.getPattern())) {
3126+
params += (", 'match' , '" + options.getPattern() + "'");
3127+
}
3128+
}
3129+
return params;
30783130
}
30793131

30803132
/**

src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2342,6 +2342,15 @@ public Cursor<byte[]> scan(ScanOptions options) {
23422342
throw new UnsupportedOperationException("'SCAN' command is not supported for Srp.");
23432343
}
23442344

2345+
/*
2346+
* (non-Javadoc)
2347+
* @see org.springframework.data.redis.connection.RedisSetCommands#sScan(byte[], org.springframework.data.redis.core.ScanOptions)
2348+
*/
2349+
@Override
2350+
public Cursor<byte[]> sScan(byte[] key, ScanOptions options) {
2351+
throw new UnsupportedOperationException("'SSCAN' command is not supported for Srp.");
2352+
}
2353+
23452354
private List<Object> closeTransaction() {
23462355
List<Object> results = Collections.emptyList();
23472356
if (txTracker != null) {

src/main/java/org/springframework/data/redis/core/BoundSetOperations.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.data.redis.core;
1818

1919
import java.util.Collection;
20+
import java.util.Iterator;
2021
import java.util.List;
2122
import java.util.Set;
2223

@@ -72,4 +73,11 @@ public interface BoundSetOperations<K, V> extends BoundKeyOperations<K> {
7273
V pop();
7374

7475
Long size();
76+
77+
/**
78+
* @param options
79+
* @return
80+
* @since 1.4
81+
*/
82+
Iterator<V> sScan(ScanOptions options);
7583
}

0 commit comments

Comments
 (0)