Skip to content

Commit 2c39f46

Browse files
author
diego Dupin
committed
[misc] adding option transactionReplaySize to control redo cache size
1 parent 5aae10e commit 2c39f46

File tree

10 files changed

+122
-72
lines changed

10 files changed

+122
-72
lines changed

src/main/java/org/mariadb/jdbc/Configuration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public class Configuration {
127127
private int retriesAllDown = 120;
128128
private String galeraAllowedState = null;
129129
private boolean transactionReplay = false;
130+
private int transactionReplaySize = 64;
130131

131132
// Pool options
132133
private boolean pool = false;
@@ -202,6 +203,7 @@ private Configuration(
202203
int retriesAllDown,
203204
String galeraAllowedState,
204205
boolean transactionReplay,
206+
int transactionReplaySize,
205207
boolean pool,
206208
String poolName,
207209
int maxPoolSize,
@@ -267,6 +269,7 @@ private Configuration(
267269
this.retriesAllDown = retriesAllDown;
268270
this.galeraAllowedState = galeraAllowedState;
269271
this.transactionReplay = transactionReplay;
272+
this.transactionReplaySize = transactionReplaySize;
270273
this.pool = pool;
271274
this.poolName = poolName;
272275
this.maxPoolSize = maxPoolSize;
@@ -343,6 +346,7 @@ private Configuration(
343346
Boolean useReadAheadInput,
344347
Boolean cachePrepStmts,
345348
Boolean transactionReplay,
349+
Integer transactionReplaySize,
346350
String geometryDefaultType,
347351
String restrictedAuth,
348352
Properties nonMappedOptions)
@@ -421,6 +425,7 @@ private Configuration(
421425
if (useReadAheadInput != null) this.useReadAheadInput = useReadAheadInput;
422426
if (cachePrepStmts != null) this.cachePrepStmts = cachePrepStmts;
423427
if (transactionReplay != null) this.transactionReplay = transactionReplay;
428+
if (transactionReplaySize != null) this.transactionReplaySize = transactionReplaySize;
424429
if (geometryDefaultType != null) this.geometryDefaultType = geometryDefaultType;
425430
if (restrictedAuth != null) this.restrictedAuth = restrictedAuth;
426431
if (serverSslCert != null) this.serverSslCert = serverSslCert;
@@ -720,6 +725,7 @@ public Configuration clone(String username, String password) {
720725
this.retriesAllDown,
721726
this.galeraAllowedState,
722727
this.transactionReplay,
728+
this.transactionReplaySize,
723729
this.pool,
724730
this.poolName,
725731
this.maxPoolSize,
@@ -993,6 +999,10 @@ public boolean transactionReplay() {
993999
return transactionReplay;
9941000
}
9951001

1002+
public int transactionReplaySize() {
1003+
return transactionReplaySize;
1004+
}
1005+
9961006
public String geometryDefaultType() {
9971007
return geometryDefaultType;
9981008
}
@@ -1235,6 +1245,7 @@ public static final class Builder implements Cloneable {
12351245
private Integer retriesAllDown;
12361246
private String galeraAllowedState;
12371247
private Boolean transactionReplay;
1248+
private Integer transactionReplaySize;
12381249

12391250
// Pool options
12401251
private Boolean pool;
@@ -1714,6 +1725,11 @@ public Builder transactionReplay(Boolean transactionReplay) {
17141725
return this;
17151726
}
17161727

1728+
public Builder transactionReplaySize(Integer transactionReplaySize) {
1729+
this.transactionReplaySize = transactionReplaySize;
1730+
return this;
1731+
}
1732+
17171733
public Configuration build() throws SQLException {
17181734
Configuration conf =
17191735
new Configuration(
@@ -1779,6 +1795,7 @@ public Configuration build() throws SQLException {
17791795
this.useReadAheadInput,
17801796
this.cachePrepStmts,
17811797
this.transactionReplay,
1798+
this.transactionReplaySize,
17821799
this.geometryDefaultType,
17831800
this.restrictedAuth,
17841801
this._nonMappedOptions);

src/main/java/org/mariadb/jdbc/ServerPreparedStatement.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,13 @@ public ServerPreparedStatement(
5050
resultSetType,
5151
resultSetConcurrency,
5252
defaultFetchSize);
53-
if (!PREPARABLE_STATEMENT_PATTERN.matcher(sql).find()) {
54-
prepareIfNotAlready(sql);
53+
prepareResult = con.getContext().getPrepareCache().get(sql, this);
54+
if (prepareResult == null && !PREPARABLE_STATEMENT_PATTERN.matcher(sql).find()) {
55+
con.getClient().execute(new PreparePacket(sql), this);
5556
}
5657
parameters = new ParameterList();
5758
}
5859

59-
private void prepareIfNotAlready(String cmd) throws SQLException {
60-
if (prepareResult == null) {
61-
prepareResult = con.getContext().getPrepareCache().get(cmd, this);
62-
if (prepareResult == null) {
63-
con.getClient().execute(new PreparePacket(cmd), this);
64-
}
65-
}
66-
}
67-
6860
protected void executeInternal() throws SQLException {
6961
checkNotClosed();
7062
validParameters();
@@ -119,7 +111,12 @@ private void executePipeline(String cmd) throws SQLException {
119111

120112
private void executeStandard(String cmd) throws SQLException {
121113
// send COM_STMT_PREPARE
122-
prepareIfNotAlready(cmd);
114+
if (prepareResult == null) {
115+
prepareResult = con.getContext().getPrepareCache().get(cmd, this);
116+
if (prepareResult == null) {
117+
con.getClient().execute(new PreparePacket(cmd), this);
118+
}
119+
}
123120

124121
// send COM_STMT_EXECUTE
125122
ExecutePacket execute = new ExecutePacket(prepareResult, parameters, cmd, this);

src/main/java/org/mariadb/jdbc/client/context/RedoContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public RedoContext(
2424
ExceptionFactory exceptionFactory,
2525
PrepareCache prepareCache) {
2626
super(handshake, clientCapabilities, conf, exceptionFactory, prepareCache);
27-
this.transactionSaver = new TransactionSaver();
27+
this.transactionSaver = new TransactionSaver(conf.transactionReplaySize());
2828
}
2929

3030
public void setServerStatus(int serverStatus) {

src/main/java/org/mariadb/jdbc/client/impl/MultiPrimaryClient.java

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ protected Client connectHost(boolean readOnly, boolean failFast) throws SQLExcep
134134
throw lastSqle;
135135
}
136136

137-
protected void reConnect() throws SQLException {
137+
protected Client reConnect() throws SQLException {
138138

139139
denyList.putIfAbsent(
140140
currentClient.getHostAddress(), System.currentTimeMillis() + deniedListTimeout);
@@ -146,35 +146,54 @@ protected void reConnect() throws SQLException {
146146

147147
currentClient = connectHost(false, false);
148148
syncNewState(oldClient);
149+
return oldClient;
149150

150-
if (conf.transactionReplay()) {
151-
executeTransactionReplay(oldClient);
152-
} else if ((oldClient.getContext().getServerStatus() & ServerStatus.IN_TRANSACTION) > 0) {
153-
// transaction is lost, but connection is now up again.
151+
} catch (SQLNonTransientConnectionException sqle) {
152+
currentClient = null;
153+
closed = true;
154+
throw sqle;
155+
}
156+
}
157+
158+
protected void replayIfPossible(Client oldClient) throws SQLException {
159+
// oldClient is only valued if this occurs on master.
160+
if (oldClient != null) {
161+
if ((oldClient.getContext().getServerStatus() & ServerStatus.IN_TRANSACTION) > 0) {
162+
if (conf.transactionReplay()) {
163+
executeTransactionReplay(oldClient);
164+
} else {
165+
// transaction is lost, but connection is now up again.
166+
// changing exception to SQLTransientConnectionException
167+
throw new SQLTransientConnectionException(
168+
String.format(
169+
"Driver has reconnect connection after a communications link failure with %s. In progress transaction was lost",
170+
oldClient.getHostAddress()),
171+
"25S03");
172+
}
173+
} else {
174+
// no transaction, but connection is now up again.
154175
// changing exception to SQLTransientConnectionException
155176
throw new SQLTransientConnectionException(
156177
String.format(
157-
"Driver has reconnect connection after a "
158-
+ "communications "
159-
+ "link "
160-
+ "failure with %s. In progress transaction was lost",
178+
"Driver has reconnect connection after a communications link failure with %s",
161179
oldClient.getHostAddress()),
162180
"25S03");
163181
}
164-
165-
} catch (SQLNonTransientConnectionException sqle) {
166-
currentClient = null;
167-
closed = true;
168-
throw sqle;
169182
}
170183
}
171184

172185
protected void executeTransactionReplay(Client oldCli) throws SQLException {
173186
// transaction replay
174-
if ((oldCli.getContext().getServerStatus() & ServerStatus.IN_TRANSACTION) > 0) {
175-
RedoContext ctx = (RedoContext) oldCli.getContext();
176-
((ReplayClient) currentClient).transactionReplay(ctx.getTransactionSaver());
187+
RedoContext ctx = (RedoContext) oldCli.getContext();
188+
if (ctx.getTransactionSaver().isDirty()) {
189+
ctx.getTransactionSaver().clear();
190+
throw new SQLTransientConnectionException(
191+
String.format(
192+
"Driver has reconnect connection after a communications link failure with %s. In progress transaction was too big to be replayed, and was lost",
193+
oldCli.getHostAddress()),
194+
"25S03");
177195
}
196+
((ReplayClient) currentClient).transactionReplay(ctx.getTransactionSaver());
178197
}
179198

180199
public void syncNewState(Client oldCli) throws SQLException {
@@ -271,18 +290,18 @@ public List<Completion> execute(
271290
closeOnCompletion);
272291
} catch (SQLNonTransientConnectionException e) {
273292
HostAddress hostAddress = currentClient.getHostAddress();
274-
reConnect();
293+
Client oldClient = reConnect();
275294

276295
if (message instanceof QueryPacket && ((QueryPacket) message).isCommit()) {
277296
throw new SQLTransientConnectionException(
278297
String.format(
279-
"Driver has reconnect connection after a "
280-
+ "communications "
281-
+ "failure with %s during a COMMIT statement",
298+
"Driver has reconnect connection after a communications failure with %s during a COMMIT statement",
282299
hostAddress),
283300
"25S03");
284301
}
285302

303+
replayIfPossible(oldClient);
304+
286305
if (message instanceof RedoableWithPrepareClientMessage) {
287306
((RedoableWithPrepareClientMessage) message).rePrepare(currentClient);
288307
}
@@ -323,7 +342,8 @@ public List<Completion> executePipeline(
323342
} catch (SQLException e) {
324343
if (e instanceof SQLNonTransientConnectionException
325344
|| (e.getCause() != null && e.getCause() instanceof SQLNonTransientConnectionException)) {
326-
reConnect();
345+
Client oldClient = reConnect();
346+
replayIfPossible(oldClient);
327347
Arrays.stream(messages)
328348
.filter(RedoableWithPrepareClientMessage.class::isInstance)
329349
.map(RedoableWithPrepareClientMessage.class::cast)
@@ -365,8 +385,13 @@ public void readStreamingResults(
365385
currentClient.readStreamingResults(
366386
completions, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
367387
} catch (SQLNonTransientConnectionException e) {
368-
reConnect();
369-
throw getExceptionFactory().create("Socket error during result streaming", "HY000");
388+
try {
389+
reConnect();
390+
} catch (SQLException e2) {
391+
throw getExceptionFactory()
392+
.create("Socket error during result streaming", e2.getSQLState(), e2);
393+
}
394+
throw getExceptionFactory().create("Socket error during result streaming", "HY000", e);
370395
}
371396
}
372397

src/main/java/org/mariadb/jdbc/client/impl/MultiPrimaryReplicaClient.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import java.sql.SQLException;
88
import java.sql.SQLNonTransientConnectionException;
9-
import java.sql.SQLTransientConnectionException;
109
import java.util.List;
1110
import java.util.concurrent.Executor;
1211
import java.util.concurrent.locks.ReentrantLock;
@@ -19,7 +18,6 @@
1918
import org.mariadb.jdbc.export.ExceptionFactory;
2019
import org.mariadb.jdbc.export.Prepare;
2120
import org.mariadb.jdbc.message.ClientMessage;
22-
import org.mariadb.jdbc.util.constants.ServerStatus;
2321
import org.mariadb.jdbc.util.log.Logger;
2422
import org.mariadb.jdbc.util.log.Loggers;
2523

@@ -91,7 +89,7 @@ private void reconnectIfNeeded() {
9189
* @throws SQLException if exception
9290
*/
9391
@Override
94-
protected void reConnect() throws SQLException {
92+
protected Client reConnect() throws SQLException {
9593
denyList.putIfAbsent(
9694
currentClient.getHostAddress(), System.currentTimeMillis() + deniedListTimeout);
9795
logger.info("Connection error on {}", currentClient.getHostAddress());
@@ -155,22 +153,7 @@ protected void reConnect() throws SQLException {
155153

156154
// if reconnect succeed on replica / use master, no problem, continuing without interruption
157155
// if reconnect primary, then replay transaction / throw exception if was in transaction.
158-
if (!requestReadOnly) {
159-
if (conf.transactionReplay()) {
160-
executeTransactionReplay(oldClient);
161-
} else if ((oldClient.getContext().getServerStatus() & ServerStatus.IN_TRANSACTION) > 0) {
162-
// transaction is lost, but connection is now up again.
163-
// changing exception to SQLTransientConnectionException
164-
throw new SQLTransientConnectionException(
165-
String.format(
166-
"Driver has reconnect connection after a "
167-
+ "communications "
168-
+ "link "
169-
+ "failure with %s. In progress transaction was lost",
170-
oldClient.getHostAddress()),
171-
"25S03");
172-
}
173-
}
156+
return requestReadOnly ? null : oldClient;
174157

175158
} catch (SQLNonTransientConnectionException sqle) {
176159
currentClient = null;

src/main/java/org/mariadb/jdbc/client/impl/ReplayClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,12 @@ public List<Completion> execute(
103103
}
104104

105105
public void transactionReplay(TransactionSaver transactionSaver) throws SQLException {
106-
List<RedoableClientMessage> buffers = transactionSaver.getBuffers();
106+
RedoableClientMessage[] buffers = transactionSaver.getBuffers();
107107
try {
108108
// replay all but last
109109
Prepare prepare;
110-
for (RedoableClientMessage querySaver : buffers) {
110+
for (int i = 0; i < transactionSaver.getIdx(); i++) {
111+
RedoableClientMessage querySaver = buffers[i];
111112
int responseNo;
112113
if (querySaver instanceof RedoableWithPrepareClientMessage) {
113114
// command is a prepare statement query

src/main/java/org/mariadb/jdbc/client/impl/TransactionSaver.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,41 @@
44

55
package org.mariadb.jdbc.client.impl;
66

7-
import java.util.ArrayList;
8-
import java.util.List;
7+
import java.util.Arrays;
98
import org.mariadb.jdbc.message.client.RedoableClientMessage;
109

1110
public class TransactionSaver {
12-
private final List<RedoableClientMessage> buffers = new ArrayList<>();
11+
private final RedoableClientMessage[] buffers;
12+
private int idx = 0;
13+
private boolean dirty = false;
14+
15+
public TransactionSaver(int transactionReplaySize) {
16+
buffers = new RedoableClientMessage[transactionReplaySize];
17+
}
1318

1419
public void add(RedoableClientMessage clientMessage) {
15-
buffers.add(clientMessage);
20+
if (idx < buffers.length) {
21+
buffers[idx++] = clientMessage;
22+
} else {
23+
dirty = true;
24+
}
1625
}
1726

1827
public void clear() {
19-
buffers.clear();
28+
Arrays.fill(buffers, null);
29+
dirty = false;
30+
idx = 0;
31+
}
32+
33+
public int getIdx() {
34+
return idx;
35+
}
36+
37+
public boolean isDirty() {
38+
return dirty;
2039
}
2140

22-
public List<RedoableClientMessage> getBuffers() {
41+
public RedoableClientMessage[] getBuffers() {
2342
return buffers;
2443
}
2544
}

src/main/resources/driver.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ useReadAheadInput=use a buffered inputSteam that read socket available data. Thi
4949
cachePrepStmts=enable/disable prepare Statement cache. When enable, PreparedStatement.close won't close prepare immediately, keeping a pool of most used prepared results. Default true.
5050
timezone=Driver set connection timezone to client timezone. Value default to client default time-zone ID, but can be set using this option. If you are sure that server use the same timezone, setting connection timezone can be disabled using value 'disable'.
5151
transactionReplay=When having a failover, can current transaction beeing re-executed, having a completly transparent failover. All commands must be idempotent. Default false.
52+
transactionReplaySize=replay cache buffer maximum size. If a transaction has more command that this size and a failover occurs, transaction will then not be replayed, just throwing an exception error. (Integer) default 64.
5253
allowLocalInfile=Indicate if LOAD DATA LOCAL INFILE commands are permitted. This will disable all pipelining implementation. Default false.
5354
geometryDefaultType=Indicate what default Object type Geometry a resultset.getObject must return. null or empty is WKB byte array. 'default' will return org.mariadb.mariadb.jdbc.type Object. Default null
5455
keyStore=File path of the keyStore file that contain client private key store and associate certificates (similar to java System property "javax.net.ssl.keyStore", but ensure that only the private key's entries are used)

0 commit comments

Comments
 (0)