Skip to content

Commit 2c14460

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-624 - Upgrade to Reactor 3.1.0.BUILD-SNAPSHOT.
Translate `Mono.then` -> `flatMap`, `Mono.flatMap` -> `flatMapMany`. This still fails as we also need a compatible version of lettuce. Original pull request: spring-projects#245.
1 parent 04e3465 commit 2c14460

File tree

8 files changed

+24
-31
lines changed

8 files changed

+24
-31
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveGeoCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ default Mono<String> geoHash(ByteBuffer key, ByteBuffer member) {
417417
Assert.notNull(member, "Member must not be null!");
418418

419419
return geoHash(key, Collections.singletonList(member)) //
420-
.then(vals -> vals.isEmpty() ? Mono.empty() : Mono.justOrEmpty(vals.iterator().next()));
420+
.flatMap(vals -> vals.isEmpty() ? Mono.empty() : Mono.justOrEmpty(vals.iterator().next()));
421421
}
422422

423423
/**
@@ -524,7 +524,7 @@ default Mono<Point> geoPos(ByteBuffer key, ByteBuffer member) {
524524
Assert.notNull(member, "Member must not be null!");
525525

526526
return geoPos(key, Collections.singletonList(member))
527-
.then(vals -> vals.isEmpty() ? Mono.empty() : Mono.justOrEmpty(vals.iterator().next()));
527+
.flatMap(vals -> vals.isEmpty() ? Mono.empty() : Mono.justOrEmpty(vals.iterator().next()));
528528
}
529529

530530
/**

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterKeyCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> comm
8888
return super.rename(Mono.just(command));
8989
}
9090

91-
Flux<Boolean> result = cmd.dump(command.getKey())
91+
Mono<Boolean> result = cmd.dump(command.getKey())
9292
.otherwiseIfEmpty(Mono.error(new RedisSystemException("Cannot rename key that does not exist",
9393
new RedisException("ERR no such key."))))
9494
.flatMap(value -> cmd.restore(command.getNewName(), 0, value).flatMap(res -> cmd.del(command.getKey())))
@@ -114,7 +114,7 @@ public Flux<BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> co
114114
return super.renameNX(Mono.just(command));
115115
}
116116

117-
Flux<Boolean> result = cmd.exists(command.getNewName()).flatMap(exists -> {
117+
Mono<Boolean> result = cmd.exists(command.getNewName()).flatMap(exists -> {
118118

119119
if (exists == 1) {
120120
return Mono.just(Boolean.FALSE);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterListCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Flux<ByteBufferResponse<RPopLPushCommand>> rPopLPush(Publisher<RPopLPushC
7878
return super.rPopLPush(Mono.just(command));
7979
}
8080

81-
Flux<ByteBuffer> result = cmd.rpop(command.getKey())
81+
Mono<ByteBuffer> result = cmd.rpop(command.getKey())
8282
.flatMap(value -> cmd.lpush(command.getDestination(), value).map(x -> value));
8383

8484
return result.map(value -> new ByteBufferResponse<>(command, value));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterSetCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public Flux<BooleanResponse<SMoveCommand>> sMove(Publisher<SMoveCommand> command
224224
return super.sMove(Mono.just(command));
225225
}
226226

227-
Flux<Boolean> result = cmd.exists(command.getKey())
227+
Mono<Boolean> result = cmd.exists(command.getKey())
228228
.flatMap(nrKeys -> nrKeys == 0 ? Mono.empty() : cmd.sismember(command.getKey(), command.getValue()))
229229
.flatMap(exists -> {
230230

src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public Mono<List<GeoLocation<V>>> geoRadius(K key, Circle within) {
240240
Assert.notNull(within, "Circle must not be null!");
241241

242242
return createMono(connection -> connection.geoRadius(rawKey(key), within) //
243-
.flatMap(Flux::fromIterable) //
243+
.flatMapMany(Flux::fromIterable) //
244244
.map(location -> new GeoLocation<>(readValue(location.getName()), location.getPoint())) //
245245
.collectList());
246246
}
@@ -256,7 +256,7 @@ public Mono<GeoResults<GeoLocation<V>>> geoRadius(K key, Circle within, GeoRadiu
256256
Assert.notNull(args, "GeoRadiusCommandArgs must not be null!");
257257

258258
return createMono(connection -> connection.geoRadius(rawKey(key), within, args) //
259-
.flatMap(Flux::fromIterable) //
259+
.flatMapMany(Flux::fromIterable) //
260260
.map(geoResult -> new GeoResult<>(
261261
new GeoLocation<>(readValue(geoResult.getContent().getName()), geoResult.getContent().getPoint()),
262262
geoResult.getDistance())) //
@@ -274,7 +274,7 @@ public Mono<List<GeoLocation<V>>> geoRadiusByMember(K key, V member, double radi
274274
Assert.notNull(member, "Member must not be null!");
275275

276276
return createMono(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
277-
.flatMap(Flux::fromIterable) //
277+
.flatMapMany(Flux::fromIterable) //
278278
.map(geoLocation -> new GeoLocation<>(readValue(geoLocation.getName()), geoLocation.getPoint())) //
279279
.collectList());
280280
}
@@ -290,7 +290,7 @@ public Mono<List<GeoLocation<V>>> geoRadiusByMember(K key, V member, Distance di
290290
Assert.notNull(distance, "Distance must not be null!");
291291

292292
return createMono(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
293-
.flatMap(Flux::fromIterable) //
293+
.flatMapMany(Flux::fromIterable) //
294294
.map(geoLocation -> new GeoLocation<>(readValue(geoLocation.getName()), geoLocation.getPoint())) //
295295
.collectList());
296296
}
@@ -308,7 +308,7 @@ public Mono<GeoResults<GeoLocation<V>>> geoRadiusByMember(K key, V member, Dista
308308
Assert.notNull(args, "GeoRadiusCommandArgs must not be null!");
309309

310310
return createMono(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance, args) //
311-
.flatMap(Flux::fromIterable) //
311+
.flatMapMany(Flux::fromIterable) //
312312
.map(geoResult -> new GeoResult<>(
313313
new GeoLocation<>(readValue(geoResult.getContent().getName()), geoResult.getContent().getPoint()),
314314
geoResult.getDistance())) //

src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Mono<Long> remove(H key, Object... hashKeys) {
7474
return createMono(connection -> Flux.fromArray(hashKeys) //
7575
.map(o -> (HK) o).map(this::rawHashKey) //
7676
.collectList() //
77-
.then(hks -> connection.hDel(rawKey(key), hks)));
77+
.flatMap(hks -> connection.hDel(rawKey(key), hks)));
7878
}
7979

8080
/* (non-Javadoc)
@@ -116,7 +116,7 @@ public Mono<List<HV>> multiGet(H key, Collection<HK> hashKeys) {
116116
return createMono(connection -> Flux.fromIterable(hashKeys) //
117117
.map(this::rawHashKey) //
118118
.collectList() //
119-
.then(hks -> connection.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
119+
.flatMap(hks -> connection.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
120120
}
121121

122122
/* (non-Javadoc)
@@ -156,7 +156,7 @@ public Mono<List<HK>> keys(H key) {
156156
Assert.notNull(key, "Key must not be null!");
157157

158158
return createMono(connection -> connection.hKeys(rawKey(key)) //
159-
.flatMap(Flux::fromIterable) //
159+
.flatMapMany(Flux::fromIterable) //
160160
.map(this::readHashKey) //
161161
.collectList());
162162
}
@@ -221,7 +221,7 @@ public Mono<List<HV>> values(H key) {
221221
Assert.notNull(key, "Key must not be null!");
222222

223223
return createMono(connection -> connection.hVals(rawKey(key)) //
224-
.flatMap(Flux::fromIterable) //
224+
.flatMapMany(Flux::fromIterable) //
225225
.map(this::readHashValue) //
226226
.collectList());
227227
}

src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void leftPush() {
125125
StepVerifier.create(listOperations.leftPush(key, value1)).expectNext(1L).verifyComplete();
126126
StepVerifier.create(listOperations.leftPush(key, value2)).expectNext(2L).verifyComplete();
127127

128-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value2)
128+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value2)
129129
.expectNext(value1).verifyComplete();
130130
}
131131

@@ -140,7 +140,7 @@ public void leftPushAll() {
140140

141141
StepVerifier.create(listOperations.leftPushAll(key, value1, value2)).expectNext(2L).verifyComplete();
142142

143-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value2)
143+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value2)
144144
.expectNext(value1).verifyComplete();
145145
}
146146

@@ -170,7 +170,7 @@ public void leftPushWithPivot() {
170170

171171
StepVerifier.create(listOperations.leftPush(key, value1, value3)).expectNext(3L).verifyComplete();
172172

173-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value2)
173+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value2)
174174
.expectNext(value3).expectNext(value1).verifyComplete();
175175
}
176176

@@ -186,7 +186,7 @@ public void rightPush() {
186186
StepVerifier.create(listOperations.rightPush(key, value1)).expectNext(1L).verifyComplete();
187187
StepVerifier.create(listOperations.rightPush(key, value2)).expectNext(2L).verifyComplete();
188188

189-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value1)
189+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value1)
190190
.expectNext(value2).verifyComplete();
191191
}
192192

@@ -201,7 +201,7 @@ public void rightPushAll() {
201201

202202
StepVerifier.create(listOperations.rightPushAll(key, value1, value2)).expectNext(2L).verifyComplete();
203203

204-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value1)
204+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value1)
205205
.expectNext(value2).verifyComplete();
206206
}
207207

@@ -231,7 +231,7 @@ public void rightPushWithPivot() {
231231

232232
StepVerifier.create(listOperations.rightPush(key, value1, value3)).expectNext(3L).verifyComplete();
233233

234-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value1)
234+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value1)
235235
.expectNext(value3).expectNext(value2).verifyComplete();
236236
}
237237

@@ -248,7 +248,7 @@ public void set() {
248248

249249
StepVerifier.create(listOperations.set(key, 1, value1)).expectNext(true).verifyComplete();
250250

251-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value1)
251+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value1)
252252
.expectNext(value1).verifyComplete();
253253
}
254254

@@ -265,7 +265,7 @@ public void remove() {
265265

266266
StepVerifier.create(listOperations.remove(key, 1, value1)).expectNext(1L).verifyComplete();
267267

268-
StepVerifier.create(listOperations.range(key, 0, -1).flatMap(Flux::fromIterable)).expectNext(value2)
268+
StepVerifier.create(listOperations.range(key, 0, -1).flatMapMany(Flux::fromIterable)).expectNext(value2)
269269
.verifyComplete();
270270
}
271271

src/test/java/reactor/test/TestSubscriber.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.reactivestreams.Subscription;
3838

3939
import reactor.core.Fuseable;
40-
import reactor.core.Receiver;
41-
import reactor.core.Trackable;
4240
import reactor.core.publisher.Operators;
4341

4442
/**
@@ -95,7 +93,7 @@
9593
* @author Stephane Maldini
9694
* @author Brian Clozel
9795
*/
98-
public class TestSubscriber<T> implements Subscriber<T>, Subscription, Trackable, Receiver {
96+
public class TestSubscriber<T> implements Subscriber<T>, Subscription {
9997

10098
/**
10199
* Default timeout for waiting next values to be received
@@ -839,17 +837,14 @@ public void cancel() {
839837
}
840838
}
841839

842-
@Override
843840
public final boolean isCancelled() {
844841
return s == Operators.cancelledSubscription();
845842
}
846843

847-
@Override
848844
public final boolean isStarted() {
849845
return s != null;
850846
}
851847

852-
@Override
853848
public final boolean isTerminated() {
854849
return isCancelled();
855850
}
@@ -953,7 +948,6 @@ public void request(long n) {
953948
}
954949
}
955950

956-
@Override
957951
public final long requestedFromDownstream() {
958952
return requested;
959953
}
@@ -969,7 +963,6 @@ public final TestSubscriber<T> requestedFusionMode(int requestMode) {
969963
return this;
970964
}
971965

972-
@Override
973966
public Subscription upstream() {
974967
return s;
975968
}

0 commit comments

Comments
 (0)