Skip to content

Commit 9b0d405

Browse files
DATAREDIS-1011 - Polishing.
Fix copyright headers and update documentation. Original Pull Request: spring-projects#511
1 parent 42ad8d1 commit 9b0d405

File tree

5 files changed

+38
-13
lines changed

5 files changed

+38
-13
lines changed

src/main/asciidoc/reference/pipelining.adoc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ List<Object> results = stringRedisTemplate.executePipelined(
2222

2323
The preceding example runs a bulk right pop of items from a queue in a pipeline. The `results` `List` contains all of the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings. There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results.
2424

25-
Note that the value returned from the `RedisCallback` is required to be null, as this value is discarded in favor of returning the results of the pipelined commands.
25+
Note that the value returned from the `RedisCallback` is required to be `null`, as this value is discarded in favor of returning the results of the pipelined commands.
26+
27+
[TIP]
28+
====
29+
The Lettuce driver supports fine grained flush control that allows to either flush commands as they appear, buffer or send them at connection close.
30+
31+
[source,java]
32+
----
33+
LettuceConnectionFactory factory = // ...
34+
factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3)); <1>
35+
----
36+
<1> Buffer locally and flush after every 3rd command.
37+
====
2638

2739
include::version-note.adoc[]

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@
6565
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
6666
import org.springframework.data.redis.connection.*;
6767
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
68+
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
6869
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceResultBuilder;
6970
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceStatusResult;
70-
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
7171
import org.springframework.data.redis.core.RedisCommand;
7272
import org.springframework.lang.Nullable;
7373
import org.springframework.util.Assert;
@@ -518,6 +518,7 @@ public boolean isPipelined() {
518518
*/
519519
@Override
520520
public void openPipeline() {
521+
521522
if (!isPipelined) {
522523
isPipelined = true;
523524
ppline = new ArrayList<>();
@@ -877,6 +878,7 @@ public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults)
877878
* @see PipeliningFlushPolicy#flushEachCommand()
878879
* @see #openPipeline()
879880
* @see StatefulRedisConnection#flushCommands()
881+
* @since 2.3
880882
*/
881883
public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
882884

@@ -923,15 +925,14 @@ protected LettuceSubscription doCreateSubscription(MessageListener listener,
923925
}
924926

925927
void pipeline(LettuceResult result) {
926-
if (isQueueing()) {
927928

928-
if (flushState != null) {
929-
flushState.onCommand(getOrCreateDedicatedConnection());
930-
}
929+
if (flushState != null) {
930+
flushState.onCommand(getOrCreateDedicatedConnection());
931+
}
931932

933+
if (isQueueing()) {
932934
transaction(result);
933935
} else {
934-
flushState.onCommand(getOrCreateDedicatedConnection());
935936
ppline.add(result);
936937
}
937938
}
@@ -1361,6 +1362,8 @@ public void release(StatefulConnection<?, ?> connection) {
13611362
*
13621363
* @see StatefulRedisConnection#setAutoFlushCommands(boolean)
13631364
* @see StatefulRedisConnection#flushCommands()
1365+
* @author Mark Paluch
1366+
* @since 2.3
13641367
*/
13651368
public interface PipeliningFlushPolicy {
13661369

@@ -1401,6 +1404,9 @@ static PipeliningFlushPolicy buffered(int bufferSize) {
14011404

14021405
/**
14031406
* State object associated with flushing of the currently ongoing pipeline.
1407+
*
1408+
* @author Mark Paluch
1409+
* @since 2.3
14041410
*/
14051411
public interface PipeliningFlushState {
14061412

@@ -1431,6 +1437,8 @@ public interface PipeliningFlushState {
14311437

14321438
/**
14331439
* Implementation to flush on each command.
1440+
* @author Mark Paluch
1441+
* @since 2.3
14341442
*/
14351443
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
14361444

@@ -1453,6 +1461,8 @@ public void onClose(StatefulConnection<?, ?> connection) {}
14531461

14541462
/**
14551463
* Implementation to flush on closing the pipeline.
1464+
* @author Mark Paluch
1465+
* @since 2.3
14561466
*/
14571467
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
14581468

@@ -1475,13 +1485,15 @@ public void onCommand(StatefulConnection<?, ?> connection) {
14751485

14761486
@Override
14771487
public void onClose(StatefulConnection<?, ?> connection) {
1478-
connection.setAutoFlushCommands(true);
14791488
connection.flushCommands();
1489+
connection.setAutoFlushCommands(true);
14801490
}
14811491
}
14821492

14831493
/**
14841494
* Pipeline state for buffered flushing.
1495+
* @author Mark Paluch
1496+
* @since 2.3
14851497
*/
14861498
private static class BufferedFlushing implements PipeliningFlushState {
14871499

@@ -1507,8 +1519,8 @@ public void onCommand(StatefulConnection<?, ?> connection) {
15071519

15081520
@Override
15091521
public void onClose(StatefulConnection<?, ?> connection) {
1510-
connection.setAutoFlushCommands(true);
15111522
connection.flushCommands();
1523+
connection.setAutoFlushCommands(true);
15121524
}
15131525
}
15141526
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,13 @@ public void setPort(int port) {
560560
}
561561

562562
/**
563-
* Configures the flushing policy when using pipelining. Defaults to {@link PipeliningFlushPolicy#flushEachCommand()
564-
* flush on each command}.
563+
* Configures the flushing policy when using pipelining. If not set, defaults to
564+
* {@link PipeliningFlushPolicy#flushEachCommand() flush on each command}.
565565
*
566566
* @param pipeliningFlushPolicy the flushing policy to control when commands get written to the Redis connection.
567567
* @see LettuceConnection#openPipeline()
568568
* @see StatefulRedisConnection#flushCommands()
569+
* @since 2.3
569570
*/
570571
public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
571572

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineFlushOnEndIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2020 the original author or authors.
2+
* Copyright 2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineTxFlushOnEndIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2020 the original author or authors.
2+
* Copyright 2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)