Skip to content

Commit 862c08b

Browse files
author
diego Dupin
committed
[misc] compression correction for multi-packet
1 parent 0e8c7a0 commit 862c08b

File tree

3 files changed

+74
-27
lines changed

3 files changed

+74
-27
lines changed

src/main/java/org/mariadb/jdbc/client/socket/impl/CompressOutputStream.java

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class CompressOutputStream extends OutputStream {
1515
private final OutputStream out;
1616
private final MutableInt sequence;
1717
private final byte[] header = new byte[7];
18+
private byte[] longPacketBuffer = null;
1819

1920
public CompressOutputStream(OutputStream out, MutableInt compressionSequence) {
2021
this.out = out;
@@ -46,11 +47,26 @@ public CompressOutputStream(OutputStream out, MutableInt compressionSequence) {
4647
*/
4748
@Override
4849
public void write(byte[] b, int off, int len) throws IOException {
49-
if (len < MIN_COMPRESSION_SIZE) {
50+
if (len + ((longPacketBuffer != null) ? longPacketBuffer.length : 0) < MIN_COMPRESSION_SIZE) {
5051
// *******************************************************************************
5152
// small packet, no compression
5253
// *******************************************************************************
5354

55+
if (longPacketBuffer != null) {
56+
header[0] = (byte) (len + longPacketBuffer.length);
57+
header[1] = (byte) ((len + longPacketBuffer.length) >>> 8);
58+
header[2] = 0;
59+
header[3] = sequence.incrementAndGet();
60+
header[4] = 0;
61+
header[5] = 0;
62+
header[6] = 0;
63+
out.write(header, 0, 7);
64+
out.write(longPacketBuffer, 0, longPacketBuffer.length);
65+
out.write(b, off, len);
66+
longPacketBuffer = null;
67+
return;
68+
}
69+
5470
header[0] = (byte) len;
5571
header[1] = (byte) (len >>> 8);
5672
header[2] = 0;
@@ -62,31 +78,51 @@ public void write(byte[] b, int off, int len) throws IOException {
6278
out.write(b, off, len);
6379

6480
} else {
81+
// *******************************************************************************
82+
// compressing packet
83+
// *******************************************************************************
84+
int sent = 0;
85+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
86+
try (DeflaterOutputStream deflater = new DeflaterOutputStream(baos)) {
6587

66-
// *******************************************************************************
67-
// compressing packet
68-
// *******************************************************************************
69-
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
70-
try (DeflaterOutputStream deflater = new DeflaterOutputStream(baos)) {
71-
deflater.write(b, off, len);
72-
deflater.finish();
73-
}
88+
/**
89+
* For multi packet, len will be 0x00ffffff + 4 bytes for header.
90+
* but compression can only compress up to 0x00ffffff bytes (header initial length size cannot be > 3 bytes)
91+
* so, for this specific case, a buffer will be
92+
*/
7493

75-
byte[] compressedBytes = baos.toByteArray();
94+
if (longPacketBuffer != null) {
95+
deflater.write(longPacketBuffer, 0, longPacketBuffer.length);
96+
sent = longPacketBuffer.length;
97+
longPacketBuffer = null;
98+
}
99+
if ( len + sent > 0x00ffffff) {
100+
int remaining = len + sent - 0x00ffffff;
101+
longPacketBuffer = new byte[remaining];
102+
System.arraycopy(b, off + 0x00ffffff - sent, longPacketBuffer, 0, remaining);
103+
}
76104

77-
int compressLen = compressedBytes.length;
105+
int bufLenSent = Math.min(0x00ffffff - sent, len);
106+
deflater.write(b, off, bufLenSent);
107+
sent += bufLenSent;
108+
deflater.finish();
109+
}
78110

79-
header[0] = (byte) compressLen;
80-
header[1] = (byte) (compressLen >>> 8);
81-
header[2] = (byte) (compressLen >>> 16);
82-
header[3] = sequence.incrementAndGet();
83-
header[4] = (byte) len;
84-
header[5] = (byte) (len >>> 8);
85-
header[6] = (byte) (len >>> 16);
111+
byte[] compressedBytes = baos.toByteArray();
86112

87-
out.write(header, 0, 7);
88-
out.write(compressedBytes, 0, compressLen);
89-
}
113+
int compressLen = compressedBytes.length;
114+
115+
header[0] = (byte) compressLen;
116+
header[1] = (byte) (compressLen >>> 8);
117+
header[2] = (byte) (compressLen >>> 16);
118+
header[3] = sequence.incrementAndGet();
119+
header[4] = (byte) sent;
120+
header[5] = (byte) (sent >>> 8);
121+
header[6] = (byte) (sent >>> 16);
122+
123+
out.write(header, 0, 7);
124+
out.write(compressedBytes, 0, compressLen);
125+
}
90126
}
91127
}
92128

@@ -107,6 +143,11 @@ public void write(byte[] b, int off, int len) throws IOException {
107143
*/
108144
@Override
109145
public void flush() throws IOException {
146+
if (longPacketBuffer != null) {
147+
byte[] b = longPacketBuffer;
148+
longPacketBuffer = null;
149+
write(b, 0, b.length);
150+
}
110151
out.flush();
111152
sequence.set((byte) -1);
112153
}

src/test/java/org/mariadb/jdbc/integration/CompressTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.junit.jupiter.api.*;
1111
import org.mariadb.jdbc.Connection;
1212
import org.mariadb.jdbc.Statement;
13+
import org.mariadb.jdbc.util.constants.Capabilities;
1314

1415
public class CompressTest extends Common {
1516
private static Connection shareCompressCon;
@@ -42,7 +43,7 @@ public void bigSend() throws SQLException {
4243
}
4344

4445
public void bigSend(Connection con, int maxLen) throws SQLException {
45-
char[] arr2 = new char[Math.min(maxLen, Math.min(16 * 1024 * 1024, getMaxAllowedPacket() / 2))];
46+
char[] arr2 = new char[Math.min(maxLen, Math.min(16 * 1024 * 1024, (getMaxAllowedPacket() / 2) - 1000))];
4647
for (int pos = 0; pos < arr2.length; pos++) {
4748
arr2[pos] = (char) ('A' + (pos % 60));
4849
}

src/test/java/org/mariadb/jdbc/integration/PreparedStatementParametersTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -462,16 +462,21 @@ public void bigSend(Connection con, String st) throws SQLException {
462462
@Test
463463
public void bigSendError() throws SQLException {
464464
int maxAllowedPacket = getMaxAllowedPacket();
465-
Assumptions.assumeTrue(maxAllowedPacket < 10 * 1024 * 1024);
466-
char[] arr = new char[10 * 1024 * 1024];
465+
Assumptions.assumeTrue(maxAllowedPacket < 32 * 1024 * 1024);
466+
char[] arr = new char[maxAllowedPacket];
467467
for (int pos = 0; pos < arr.length; pos++) {
468468
arr[pos] = (char) ('A' + (pos % 60));
469469
}
470+
boolean expectClosed = maxAllowedPacket > 16 * 1024 * 1024;
470471
String st = new String(arr);
471-
bigSendError(sharedConn, st, true);
472-
bigSendError(sharedConnBinary, st, true);
472+
try (Connection con = createCon()) {
473+
bigSendError(con, st, expectClosed);
474+
}
475+
try (Connection con = createCon("useServerPrepStmts=true")) {
476+
bigSendError(con, st, expectClosed);
477+
}
473478
try (Connection con = createCon("transactionReplay")) {
474-
bigSendError(con, st, false);
479+
bigSendError(con, st, expectClosed);
475480
}
476481
}
477482

0 commit comments

Comments
 (0)