Skip to content

Commit d427928

Browse files
christophstroblThomas Darimont
authored andcommitted
DATAREDIS-290 - Add support for SCAN command.
SCAN is currently only supported by jedis but can be emulated for lettuce via eval. Srp and JRedis will respond with UnsupportedOperationException. We provide a Cursor implementation allowing to scroll through the responses provided by the underlying connection. The cursor will fetch additional results from redis server whenever its starting point has not been reached and there are no more already loaded items available. Only values returned by the last call to redis server are kept in memory which allows scrolling through the entire collection without potentially running into memory issues. Original pull request: spring-projects#71.
1 parent 3414d2a commit d427928

File tree

12 files changed

+876
-0
lines changed

12 files changed

+876
-0
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.data.redis.connection.convert.ListConverter;
3333
import org.springframework.data.redis.connection.convert.MapConverter;
3434
import org.springframework.data.redis.connection.convert.SetConverter;
35+
import org.springframework.data.redis.core.Cursor;
36+
import org.springframework.data.redis.core.ScanOptions;
3537
import org.springframework.data.redis.core.types.RedisClientInfo;
3638
import org.springframework.data.redis.serializer.RedisSerializer;
3739
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -2215,6 +2217,15 @@ public void slaveOfNoOne() {
22152217
this.delegate.slaveOfNoOne();
22162218
}
22172219

2220+
/*
2221+
* (non-Javadoc)
2222+
* @see org.springframework.data.redis.connection.RedisKeyCommands#scan(org.springframework.data.redis.core.ScanOptions)
2223+
*/
2224+
@Override
2225+
public Cursor<byte[]> scan(ScanOptions options) {
2226+
return this.delegate.scan(options);
2227+
}
2228+
22182229
/**
22192230
* Specifies if pipelined and tx results should be deserialized to Strings. If false, results of
22202231
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the underlying connection

src/main/java/org/springframework/data/redis/connection/RedisKeyCommands.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import java.util.List;
1919
import java.util.Set;
2020

21+
import org.springframework.data.redis.core.Cursor;
22+
import org.springframework.data.redis.core.ScanOptions;
23+
2124
/**
2225
* Key-specific commands supported by Redis.
2326
*
@@ -62,6 +65,17 @@ public interface RedisKeyCommands {
6265
*/
6366
Set<byte[]> keys(byte[] pattern);
6467

68+
/**
69+
* Use a {@link Cursor} to iterate over keys. <br />
70+
*
71+
* @see http://redis.io/commands/scan
72+
* @param count
73+
* @param pattern
74+
* @return
75+
* @since 1.4
76+
*/
77+
Cursor<byte[]> scan(ScanOptions options);
78+
6579
/**
6680
* Return a random key from the keyspace.
6781
*

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,15 @@
4242
import org.springframework.data.redis.connection.Subscription;
4343
import org.springframework.data.redis.connection.convert.Converters;
4444
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
45+
import org.springframework.data.redis.core.Cursor;
46+
import org.springframework.data.redis.core.ScanCursor;
47+
import org.springframework.data.redis.core.ScanIteration;
48+
import org.springframework.data.redis.core.ScanOptions;
4549
import org.springframework.data.redis.core.types.RedisClientInfo;
4650
import org.springframework.util.Assert;
4751
import org.springframework.util.ObjectUtils;
4852
import org.springframework.util.ReflectionUtils;
53+
import org.springframework.util.StringUtils;
4954

5055
import redis.clients.jedis.BinaryJedis;
5156
import redis.clients.jedis.BinaryJedisPubSub;
@@ -58,6 +63,7 @@
5863
import redis.clients.jedis.Protocol.Command;
5964
import redis.clients.jedis.Queable;
6065
import redis.clients.jedis.Response;
66+
import redis.clients.jedis.ScanParams;
6167
import redis.clients.jedis.SortingParams;
6268
import redis.clients.jedis.Transaction;
6369
import redis.clients.jedis.ZParams;
@@ -2900,6 +2906,47 @@ public void slaveOfNoOne() {
29002906
}
29012907
}
29022908

2909+
public Cursor<byte[]> scan() {
2910+
return scan(ScanOptions.NONE);
2911+
}
2912+
2913+
/*
2914+
* (non-Javadoc)
2915+
* @see org.springframework.data.redis.connection.RedisKeyCommands#scan(org.springframework.data.redis.core.ScanOptions)
2916+
*/
2917+
public Cursor<byte[]> scan(ScanOptions options) {
2918+
return scan(0, options != null ? options : ScanOptions.NONE);
2919+
}
2920+
2921+
public Cursor<byte[]> scan(long cursorId, ScanOptions options) {
2922+
2923+
return new ScanCursor<byte[]>(cursorId, options) {
2924+
2925+
@Override
2926+
protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
2927+
2928+
if (isQueueing() || isPipelined()) {
2929+
throw new UnsupportedOperationException("'SCAN' cannot be called in pipeline / transaction mode.");
2930+
}
2931+
2932+
ScanParams sp = new ScanParams();
2933+
if (!options.equals(ScanOptions.NONE)) {
2934+
if (options.getCount() != null) {
2935+
sp.count(options.getCount().intValue());
2936+
}
2937+
if (StringUtils.hasText(options.getPattern())) {
2938+
sp.match(options.getPattern());
2939+
}
2940+
}
2941+
2942+
redis.clients.jedis.ScanResult<String> result = jedis.scan(Long.toString(cursorId), sp);
2943+
return new ScanIteration<byte[]>(Long.valueOf(result.getStringCursor()), JedisConverters.stringListToByteList()
2944+
.convert(result.getResult()));
2945+
}
2946+
}.open();
2947+
2948+
}
2949+
29032950
/**
29042951
* Specifies if pipelined results should be converted to the expected data type. If false, results of
29052952
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the Jedis driver

src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.springframework.data.redis.connection.ReturnType;
4444
import org.springframework.data.redis.connection.SortParameters;
4545
import org.springframework.data.redis.connection.Subscription;
46+
import org.springframework.data.redis.core.Cursor;
47+
import org.springframework.data.redis.core.ScanOptions;
4648
import org.springframework.data.redis.core.types.RedisClientInfo;
4749
import org.springframework.util.Assert;
4850
import org.springframework.util.ObjectUtils;
@@ -1236,4 +1238,13 @@ public void slaveOfNoOne() {
12361238
throw convertJredisAccessException(e);
12371239
}
12381240
}
1241+
1242+
/*
1243+
* (non-Javadoc)
1244+
* @see org.springframework.data.redis.connection.RedisKeyCommands#scan(org.springframework.data.redis.core.ScanOptions)
1245+
*/
1246+
@Override
1247+
public Cursor<byte[]> scan(ScanOptions options) {
1248+
throw new UnsupportedOperationException("'SCAN' command is not supported for jredis.");
1249+
}
12391250
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,16 @@
4949
import org.springframework.data.redis.connection.Subscription;
5050
import org.springframework.data.redis.connection.convert.Converters;
5151
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
52+
import org.springframework.data.redis.core.Cursor;
5253
import org.springframework.data.redis.core.RedisCommand;
54+
import org.springframework.data.redis.core.ScanCursor;
55+
import org.springframework.data.redis.core.ScanIteration;
56+
import org.springframework.data.redis.core.ScanOptions;
5357
import org.springframework.data.redis.core.types.RedisClientInfo;
5458
import org.springframework.util.Assert;
5559
import org.springframework.util.ClassUtils;
5660
import org.springframework.util.ObjectUtils;
61+
import org.springframework.util.StringUtils;
5762

5863
import com.lambdaworks.redis.RedisAsyncConnection;
5964
import com.lambdaworks.redis.RedisClient;
@@ -3027,6 +3032,51 @@ public void slaveOfNoOne() {
30273032
}
30283033
}
30293034

3035+
public Cursor<byte[]> scan() {
3036+
return scan(0, ScanOptions.NONE);
3037+
}
3038+
3039+
/*
3040+
* (non-Javadoc)
3041+
* @see org.springframework.data.redis.connection.RedisKeyCommands#scan(org.springframework.data.redis.core.ScanOptions)
3042+
*/
3043+
public Cursor<byte[]> scan(ScanOptions options) {
3044+
return scan(0, options != null ? options : ScanOptions.NONE);
3045+
}
3046+
3047+
public Cursor<byte[]> scan(long cursorId, ScanOptions options) {
3048+
3049+
return new ScanCursor<byte[]>(cursorId, options) {
3050+
3051+
@SuppressWarnings("unchecked")
3052+
@Override
3053+
protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
3054+
3055+
if (isQueueing() || isPipelined()) {
3056+
throw new UnsupportedOperationException("'SCAN' cannot be called in pipeline / transaction mode.");
3057+
}
3058+
3059+
String params = " ," + cursorId;
3060+
if (!options.equals(ScanOptions.NONE)) {
3061+
if (options.getCount() != null) {
3062+
params += (", 'count', " + options.getCount());
3063+
}
3064+
if (StringUtils.hasText(options.getPattern())) {
3065+
params += (", 'match' , '" + options.getPattern() + "'");
3066+
}
3067+
}
3068+
3069+
String script = "return redis.call('SCAN'" + params + ")";
3070+
3071+
List<?> result = eval(script.getBytes(), ReturnType.MULTI, 0);
3072+
String nextCursorId = LettuceConverters.bytesToString().convert((byte[]) result.get(0));
3073+
3074+
return new ScanIteration<byte[]>(Long.valueOf(nextCursorId), ((ArrayList<byte[]>) result.get(1)));
3075+
}
3076+
}.open();
3077+
3078+
}
3079+
30303080
/**
30313081
* Specifies if pipelined and transaction results should be converted to the expected data type. If false, results of
30323082
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the Lettuce driver

src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.springframework.data.redis.connection.ReturnType;
4141
import org.springframework.data.redis.connection.SortParameters;
4242
import org.springframework.data.redis.connection.Subscription;
43+
import org.springframework.data.redis.core.Cursor;
44+
import org.springframework.data.redis.core.ScanOptions;
4345
import org.springframework.data.redis.core.types.RedisClientInfo;
4446
import org.springframework.util.Assert;
4547

@@ -2331,6 +2333,15 @@ public void slaveOfNoOne() {
23312333
}
23322334
}
23332335

2336+
/*
2337+
* (non-Javadoc)
2338+
* @see org.springframework.data.redis.connection.RedisKeyCommands#scan(org.springframework.data.redis.core.ScanOptions)
2339+
*/
2340+
@Override
2341+
public Cursor<byte[]> scan(ScanOptions options) {
2342+
throw new UnsupportedOperationException("'SCAN' command is not supported for Srp.");
2343+
}
2344+
23342345
private List<Object> closeTransaction() {
23352346
List<Object> results = Collections.emptyList();
23362347
if (txTracker != null) {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.core;
17+
18+
import java.io.Closeable;
19+
import java.util.Iterator;
20+
21+
/**
22+
* @author Christoph Strobl
23+
* @param <T>
24+
* @since 1.4
25+
*/
26+
public interface Cursor<T> extends Iterator<T>, Closeable {
27+
28+
/**
29+
* Get the reference cursor. <br />
30+
* <strong>NOTE:</strong> the id might change while iterating items.
31+
*
32+
* @return
33+
*/
34+
long getCursorId();
35+
36+
/**
37+
* @return Returns true if cursor closed.
38+
*/
39+
boolean isClosed();
40+
41+
/**
42+
* Opens cursor and returns itself.
43+
*
44+
* @return
45+
*/
46+
Cursor<T> open();
47+
48+
/**
49+
* @return Returns the current position of the cursor.
50+
*/
51+
long getPosition();
52+
53+
}

0 commit comments

Comments
 (0)