Skip to content

Commit bbfe934

Browse files
committed
[misc] ensure replay is only done when set.
When transaction replay is enable, all streaming parameters will be saved in byte array, to permit replaying query. Then skipping LONG DATA since length is known. using only one command.
1 parent d2e36a3 commit bbfe934

34 files changed

+379
-221
lines changed

src/main/java/org/mariadb/jdbc/Connection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ public Connection(Configuration conf, ReentrantLock lock, Client client) throws
8585
* @throws SQLException never thrown
8686
*/
8787
public void cancelCurrentQuery() throws SQLException {
88-
try (Client cli =
89-
new ClientImpl(conf, client.getHostAddress(), false, new ReentrantLock(), true)) {
88+
try (Client cli = new ClientImpl(conf, client.getHostAddress(), new ReentrantLock(), true)) {
9089
cli.execute(new QueryPacket("KILL QUERY " + client.getContext().getThreadId()));
9190
}
9291
}

src/main/java/org/mariadb/jdbc/Driver.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
import java.util.List;
3333
import java.util.Properties;
3434
import java.util.concurrent.locks.ReentrantLock;
35-
import org.mariadb.jdbc.client.Client;
36-
import org.mariadb.jdbc.client.ClientImpl;
37-
import org.mariadb.jdbc.client.MultiPrimaryClient;
38-
import org.mariadb.jdbc.client.MultiPrimaryReplicaClient;
35+
import org.mariadb.jdbc.client.*;
3936
import org.mariadb.jdbc.util.Version;
4037

4138
public final class Driver implements java.sql.Driver {
@@ -65,7 +62,10 @@ protected static Connection connect(Configuration configuration) throws SQLExcep
6562

6663
default:
6764
hostAddress = configuration.addresses().get(0);
68-
client = new ClientImpl(configuration, hostAddress, false, lock, false);
65+
client =
66+
configuration.transactionReplay()
67+
? new ClientReplayImpl(configuration, hostAddress, lock, false)
68+
: new ClientImpl(configuration, hostAddress, lock, false);
6969
break;
7070
}
7171
return new Connection(configuration, lock, client);

src/main/java/org/mariadb/jdbc/client/ClientImpl.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.mariadb.jdbc.ServerPreparedStatement;
4141
import org.mariadb.jdbc.client.context.BaseContext;
4242
import org.mariadb.jdbc.client.context.Context;
43-
import org.mariadb.jdbc.client.context.RedoContext;
4443
import org.mariadb.jdbc.client.result.Result;
4544
import org.mariadb.jdbc.client.result.StreamingResult;
4645
import org.mariadb.jdbc.client.socket.*;
@@ -65,26 +64,22 @@ public class ClientImpl implements Client, AutoCloseable {
6564
private static Integer MAX_ALLOWED_PACKET = 0;
6665

6766
private final Socket socket;
68-
private final Context context;
6967
private final MutableInt sequence = new MutableInt();
7068
private final MutableInt compressionSequence = new MutableInt();
7169
private final ReentrantLock lock;
7270
private final Configuration conf;
7371
private final HostAddress hostAddress;
7472
private boolean closed = false;
7573
private ExceptionFactory exceptionFactory;
76-
private PacketWriter writer;
74+
protected PacketWriter writer;
7775
private PacketReader reader;
7876
private org.mariadb.jdbc.Statement streamStmt = null;
7977
private ClientMessage streamMsg = null;
8078
private int socketTimeout;
79+
protected Context context;
8180

8281
public ClientImpl(
83-
Configuration conf,
84-
HostAddress hostAddress,
85-
boolean saveTransaction,
86-
ReentrantLock lock,
87-
boolean skipPostCommands)
82+
Configuration conf, HostAddress hostAddress, ReentrantLock lock, boolean skipPostCommands)
8883
throws SQLException {
8984

9085
this.conf = conf;
@@ -116,17 +111,11 @@ public ClientImpl(
116111
InitialHandshakePacket.decode(reader.readReadablePacket(true));
117112
this.exceptionFactory.setThreadId(handshake.getThreadId());
118113
this.context =
119-
conf.transactionReplay()
120-
? new RedoContext(
121-
handshake,
122-
conf,
123-
this.exceptionFactory,
124-
new PrepareCache(conf.prepStmtCacheSize(), this))
125-
: new BaseContext(
126-
handshake,
127-
conf,
128-
this.exceptionFactory,
129-
new PrepareCache(conf.prepStmtCacheSize(), this));
114+
new BaseContext(
115+
handshake,
116+
conf,
117+
this.exceptionFactory,
118+
new PrepareCache(conf.prepStmtCacheSize(), this));
130119
this.reader.setServerThreadId(handshake.getThreadId(), hostAddress);
131120
this.writer.setServerThreadId(handshake.getThreadId(), hostAddress);
132121

@@ -227,7 +216,7 @@ private void assignStream(
227216
}
228217

229218
/** Closing socket in case of Connection error after socket creation. */
230-
private void destroySocket() {
219+
protected void destroySocket() {
231220
closed = true;
232221
try {
233222
this.reader.close();
@@ -470,7 +459,6 @@ public List<Completion> executePipeline(
470459
closeOnCompletion));
471460
}
472461
}
473-
context.saveRedo(messages);
474462
return results;
475463
} catch (SQLException sqlException) {
476464

@@ -523,17 +511,8 @@ public List<Completion> execute(
523511
boolean closeOnCompletion)
524512
throws SQLException {
525513
sendQuery(message);
526-
List<Completion> completions =
527-
readResponse(
528-
stmt,
529-
message,
530-
fetchSize,
531-
maxRows,
532-
resultSetConcurrency,
533-
resultSetType,
534-
closeOnCompletion);
535-
context.saveRedo(message);
536-
return completions;
514+
return readResponse(
515+
stmt, message, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
537516
}
538517

539518
public List<Completion> readResponse(
@@ -738,7 +717,7 @@ public Completion readPacket(
738717
}
739718
}
740719

741-
private void checkNotClosed() throws SQLException {
720+
protected void checkNotClosed() throws SQLException {
742721
if (closed) {
743722
throw exceptionFactory.create("Connection is closed", "08000", 1220);
744723
}
@@ -799,7 +778,7 @@ public void abort(Executor executor) throws SQLException {
799778
if (!lockStatus) {
800779
// lock not available : query is running
801780
// force end by executing an KILL connection
802-
try (ClientImpl cli = new ClientImpl(conf, hostAddress, false, new ReentrantLock(), true)) {
781+
try (ClientImpl cli = new ClientImpl(conf, hostAddress, new ReentrantLock(), true)) {
803782
cli.execute(new QueryPacket("KILL " + context.getThreadId()));
804783
} catch (SQLException e) {
805784
// eat
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* MariaDB Client for Java
3+
*
4+
* Copyright (c) 2012-2014 Monty Program Ab.
5+
* Copyright (c) 2015-2020 MariaDB Corporation Ab.
6+
*
7+
* This library is free software; you can redistribute it and/or modify it under
8+
* the terms of the GNU Lesser General Public License as published by the Free
9+
* Software Foundation; either version 2.1 of the License, or (at your option)
10+
* any later version.
11+
*
12+
* This library is distributed in the hope that it will be useful, but
13+
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15+
* for more details.
16+
*
17+
* You should have received a copy of the GNU Lesser General Public License along
18+
* with this library; if not, write to Monty Program Ab info@montyprogram.com.
19+
*
20+
*/
21+
22+
package org.mariadb.jdbc.client;
23+
24+
import java.io.IOException;
25+
import java.sql.SQLException;
26+
import java.util.List;
27+
import java.util.concurrent.locks.ReentrantLock;
28+
import org.mariadb.jdbc.Configuration;
29+
import org.mariadb.jdbc.HostAddress;
30+
import org.mariadb.jdbc.client.context.BaseContext;
31+
import org.mariadb.jdbc.client.context.RedoContext;
32+
import org.mariadb.jdbc.client.socket.*;
33+
import org.mariadb.jdbc.message.client.*;
34+
import org.mariadb.jdbc.message.server.Completion;
35+
import org.mariadb.jdbc.message.server.PrepareResultPacket;
36+
import org.mariadb.jdbc.util.log.Logger;
37+
import org.mariadb.jdbc.util.log.Loggers;
38+
39+
public class ClientReplayImpl extends ClientImpl {
40+
private static final Logger logger = Loggers.getLogger(ClientReplayImpl.class);
41+
42+
public ClientReplayImpl(
43+
Configuration conf, HostAddress hostAddress, ReentrantLock lock, boolean skipPostCommands)
44+
throws SQLException {
45+
super(conf, hostAddress, lock, skipPostCommands);
46+
this.context = RedoContext.from((BaseContext) this.context);
47+
}
48+
49+
@Override
50+
public int sendQuery(ClientMessage message) throws SQLException {
51+
checkNotClosed();
52+
try {
53+
if (message instanceof RedoableClientMessage)
54+
((RedoableClientMessage) message).ensureReplayable(context);
55+
return message.encode(writer, context);
56+
} catch (IOException ioException) {
57+
destroySocket();
58+
throw context
59+
.getExceptionFactory()
60+
.withSql(message.description())
61+
.create("Socket error", "08000", ioException);
62+
}
63+
}
64+
65+
@Override
66+
public List<Completion> executePipeline(
67+
ClientMessage[] messages,
68+
org.mariadb.jdbc.Statement stmt,
69+
int fetchSize,
70+
long maxRows,
71+
int resultSetConcurrency,
72+
int resultSetType,
73+
boolean closeOnCompletion)
74+
throws SQLException {
75+
List<Completion> res =
76+
super.executePipeline(
77+
messages,
78+
stmt,
79+
fetchSize,
80+
maxRows,
81+
resultSetConcurrency,
82+
resultSetType,
83+
closeOnCompletion);
84+
context.saveRedo(messages);
85+
return res;
86+
}
87+
88+
@Override
89+
public List<Completion> execute(
90+
ClientMessage message,
91+
org.mariadb.jdbc.Statement stmt,
92+
int fetchSize,
93+
long maxRows,
94+
int resultSetConcurrency,
95+
int resultSetType,
96+
boolean closeOnCompletion)
97+
throws SQLException {
98+
List<Completion> completions =
99+
super.execute(
100+
message,
101+
stmt,
102+
fetchSize,
103+
maxRows,
104+
resultSetConcurrency,
105+
resultSetType,
106+
closeOnCompletion);
107+
context.saveRedo(message);
108+
return completions;
109+
}
110+
111+
public void transactionReplay(TransactionSaver transactionSaver) throws SQLException {
112+
List<RedoableClientMessage> buffers = transactionSaver.getBuffers();
113+
try {
114+
// replay all but last
115+
PrepareResultPacket prepare;
116+
for (RedoableClientMessage querySaver : buffers) {
117+
int responseNo;
118+
if (querySaver instanceof RedoableWithPrepareClientMessage) {
119+
// command is a prepare statement query
120+
// redo on new connection need to re-prepare query
121+
// and substitute statement id
122+
RedoableWithPrepareClientMessage redoable =
123+
((RedoableWithPrepareClientMessage) querySaver);
124+
String cmd = redoable.getCommand();
125+
prepare = context.getPrepareCache().get(cmd, redoable.prep());
126+
if (prepare == null) {
127+
PreparePacket preparePacket = new PreparePacket(cmd);
128+
sendQuery(preparePacket);
129+
prepare = (PrepareResultPacket) readPacket(preparePacket);
130+
}
131+
responseNo = querySaver.reEncode(writer, context, prepare);
132+
} else {
133+
responseNo = querySaver.reEncode(writer, context, null);
134+
}
135+
for (int j = 0; j < responseNo; j++) {
136+
readResponse(querySaver);
137+
}
138+
}
139+
} catch (IOException e) {
140+
throw context
141+
.getExceptionFactory()
142+
.create("Socket error during transaction replay", "08000", e);
143+
}
144+
}
145+
}

src/main/java/org/mariadb/jdbc/client/MultiPrimaryClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ protected Client connectHost(boolean readOnly, boolean failFast) throws SQLExcep
8686
while ((host = conf.haMode().getAvailableHost(conf.addresses(), denyList, !readOnly))
8787
.isPresent()) {
8888
try {
89-
return new ClientImpl(conf, host.get(), true, lock, false);
89+
return conf.transactionReplay()
90+
? new ClientReplayImpl(conf, host.get(), lock, false)
91+
: new ClientImpl(conf, host.get(), lock, false);
9092
} catch (SQLNonTransientConnectionException sqle) {
9193
lastSqle = sqle;
9294
denyList.putIfAbsent(host.get(), System.currentTimeMillis() + DENY_TIMEOUT);
@@ -109,7 +111,10 @@ protected Client connectHost(boolean readOnly, boolean failFast) throws SQLExcep
109111
.findFirst()
110112
.map(Map.Entry::getKey);
111113
if (host.isPresent()) {
112-
Client client = new ClientImpl(conf, host.get(), true, lock, false);
114+
Client client =
115+
conf.transactionReplay()
116+
? new ClientReplayImpl(conf, host.get(), lock, false)
117+
: new ClientImpl(conf, host.get(), lock, false);
113118
denyList.remove(host.get());
114119
return client;
115120
}

src/main/java/org/mariadb/jdbc/client/context/BaseContext.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,27 @@ public class BaseContext implements Context {
4545
private PrepareCache prepareCache;
4646
private int stateFlag = 0;
4747

48+
protected BaseContext(
49+
long threadId,
50+
byte[] seed,
51+
long serverCapabilities,
52+
Configuration conf,
53+
int serverStatus,
54+
ServerVersion version,
55+
ExceptionFactory exceptionFactory,
56+
PrepareCache prepareCache) {
57+
this.threadId = threadId;
58+
this.seed = seed;
59+
this.serverCapabilities = serverCapabilities;
60+
this.serverStatus = serverStatus;
61+
this.version = version;
62+
this.eofDeprecated = (serverCapabilities & Capabilities.CLIENT_DEPRECATE_EOF) > 0;
63+
this.conf = conf;
64+
this.database = conf.database();
65+
this.exceptionFactory = exceptionFactory;
66+
this.prepareCache = prepareCache;
67+
}
68+
4869
public BaseContext(
4970
InitialHandshakePacket handshake,
5071
Configuration conf,

0 commit comments

Comments
 (0)