Skip to content

Commit a177285

Browse files
committed
[CONJ-1115] Make connector become virtual-thread friendly
1 parent 10a1107 commit a177285

File tree

3 files changed

+93
-55
lines changed

3 files changed

+93
-55
lines changed

src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public StandardClient(
107107
OutputStream out = new BufferedOutputStream(socket.getOutputStream(), 16384);
108108
InputStream in =
109109
conf.useReadAheadInput()
110-
? new ReadAheadBufferedStream(socket.getInputStream())
110+
? new ReadAheadBufferedStream(socket.getInputStream(), lock)
111111
: new BufferedInputStream(socket.getInputStream(), 16384);
112112

113113
assignStream(out, in, conf, null);
@@ -165,7 +165,7 @@ public StandardClient(
165165
out = new BufferedOutputStream(sslSocket.getOutputStream(), 16384);
166166
in =
167167
conf.useReadAheadInput()
168-
? new ReadAheadBufferedStream(sslSocket.getInputStream())
168+
? new ReadAheadBufferedStream(sslSocket.getInputStream(), lock)
169169
: new BufferedInputStream(sslSocket.getInputStream(), 16384);
170170
assignStream(out, in, conf, handshake.getThreadId());
171171
}
@@ -199,7 +199,7 @@ public StandardClient(
199199
if ((clientCapabilities & Capabilities.COMPRESS) != 0) {
200200
assignStream(
201201
new CompressOutputStream(out, compressionSequence),
202-
new CompressInputStream(in, compressionSequence),
202+
new CompressInputStream(in, compressionSequence, lock),
203203
conf,
204204
handshake.getThreadId());
205205
}

src/main/java/org/mariadb/jdbc/client/socket/impl/CompressInputStream.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.io.EOFException;
77
import java.io.IOException;
88
import java.io.InputStream;
9+
import java.util.concurrent.locks.ReentrantLock;
910
import java.util.zip.DataFormatException;
1011
import java.util.zip.Inflater;
1112
import org.mariadb.jdbc.client.util.MutableByte;
@@ -22,17 +23,19 @@ public class CompressInputStream extends InputStream {
2223

2324
private int end;
2425
private int pos;
25-
private byte[] buf;
26+
private volatile byte[] buf;
27+
private final ReentrantLock lock;
2628

2729
/**
2830
* Constructor. When this handler is used, driver expect packet with 7 byte compression header
2931
*
3032
* @param in socket input stream
3133
* @param compressionSequence compression sequence
3234
*/
33-
public CompressInputStream(InputStream in, MutableByte compressionSequence) {
35+
public CompressInputStream(InputStream in, MutableByte compressionSequence, ReentrantLock lock) {
3436
this.in = in;
3537
this.sequence = compressionSequence;
38+
this.lock = lock;
3639
}
3740

3841
/**
@@ -87,19 +90,23 @@ public int read(byte[] b, int off, int len) throws IOException {
8790
}
8891

8992
int totalReads = 0;
93+
lock.lock();
94+
try {
95+
do {
96+
if (end - pos <= 0) {
97+
retrieveBuffer();
98+
}
99+
// copy internal value to buf.
100+
int copyLength = Math.min(len - totalReads, end - pos);
101+
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
102+
pos += copyLength;
103+
totalReads += copyLength;
104+
} while (totalReads < len && super.available() > 0);
90105

91-
do {
92-
if (end - pos <= 0) {
93-
retrieveBuffer();
94-
}
95-
// copy internal value to buf.
96-
int copyLength = Math.min(len - totalReads, end - pos);
97-
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
98-
pos += copyLength;
99-
totalReads += copyLength;
100-
} while (totalReads < len && super.available() > 0);
101-
102-
return totalReads;
106+
return totalReads;
107+
} finally {
108+
lock.unlock();
109+
}
103110
}
104111

105112
private void retrieveBuffer() throws IOException {
@@ -218,7 +225,12 @@ public long skip(long n) throws IOException {
218225
*/
219226
@Override
220227
public int available() throws IOException {
221-
return in.available();
228+
lock.lock();
229+
try {
230+
return in.available();
231+
} finally {
232+
lock.unlock();
233+
}
222234
}
223235

224236
/**
@@ -257,8 +269,13 @@ public void close() throws IOException {
257269
* @see InputStream#reset()
258270
*/
259271
@Override
260-
public synchronized void mark(int readlimit) {
261-
in.mark(readlimit);
272+
public void mark(int readlimit) {
273+
lock.lock();
274+
try {
275+
in.mark(readlimit);
276+
} finally {
277+
lock.unlock();
278+
}
262279
}
263280

264281
/**
@@ -299,8 +316,13 @@ public synchronized void mark(int readlimit) {
299316
* @see IOException
300317
*/
301318
@Override
302-
public synchronized void reset() throws IOException {
303-
in.reset();
319+
public void reset() throws IOException {
320+
lock.lock();
321+
try {
322+
in.reset();
323+
} finally {
324+
lock.unlock();
325+
}
304326
}
305327

306328
/**

src/main/java/org/mariadb/jdbc/client/socket/impl/ReadAheadBufferedStream.java

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.io.FilterInputStream;
77
import java.io.IOException;
88
import java.io.InputStream;
9+
import java.util.concurrent.locks.ReentrantLock;
910

1011
/**
1112
* Permit to buf socket data, reading not only asked bytes, but available number of bytes when
@@ -15,6 +16,7 @@ public class ReadAheadBufferedStream extends FilterInputStream {
1516

1617
private static final int BUF_SIZE = 16384;
1718
private final byte[] buf;
19+
private final ReentrantLock lock;
1820
private int end;
1921
private int pos;
2022

@@ -23,11 +25,16 @@ public class ReadAheadBufferedStream extends FilterInputStream {
2325
*
2426
* @param in socket input stream
2527
*/
26-
public ReadAheadBufferedStream(InputStream in) {
28+
public ReadAheadBufferedStream(InputStream in, ReentrantLock lock) {
2729
super(in);
2830
buf = new byte[BUF_SIZE];
2931
end = 0;
3032
pos = 0;
33+
this.lock = lock;
34+
}
35+
36+
public ReentrantLock getLock() {
37+
return lock;
3138
}
3239

3340
/**
@@ -39,45 +46,49 @@ public ReadAheadBufferedStream(InputStream in) {
3946
* @return number of added bytes
4047
* @throws IOException if exception during socket reading
4148
*/
42-
public synchronized int read(byte[] externalBuf, int off, int len) throws IOException {
49+
public int read(byte[] externalBuf, int off, int len) throws IOException {
4350

4451
if (len == 0) {
4552
return 0;
4653
}
47-
48-
int totalReads = 0;
49-
while (true) {
50-
51-
// read
52-
if (end - pos <= 0) {
53-
if (len - totalReads >= buf.length) {
54-
// buf length is less than asked byte and buf is empty
55-
// => filling directly into external buf
56-
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
57-
if (reads <= 0) {
58-
return (totalReads == 0) ? -1 : totalReads;
59-
}
60-
return totalReads + reads;
61-
62-
} else {
63-
64-
// filling internal buf
65-
fillingBuffer(len - totalReads);
66-
if (end <= 0) {
67-
return (totalReads == 0) ? -1 : totalReads;
54+
lock.lock();
55+
try {
56+
int totalReads = 0;
57+
while (true) {
58+
59+
// read
60+
if (end - pos <= 0) {
61+
if (len - totalReads >= buf.length) {
62+
// buf length is less than asked byte and buf is empty
63+
// => filling directly into external buf
64+
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
65+
if (reads <= 0) {
66+
return (totalReads == 0) ? -1 : totalReads;
67+
}
68+
return totalReads + reads;
69+
70+
} else {
71+
72+
// filling internal buf
73+
fillingBuffer(len - totalReads);
74+
if (end <= 0) {
75+
return (totalReads == 0) ? -1 : totalReads;
76+
}
6877
}
6978
}
70-
}
7179

72-
// copy internal value to buf.
73-
int copyLength = Math.min(len - totalReads, end - pos);
74-
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
75-
pos += copyLength;
76-
totalReads += copyLength;
80+
// copy internal value to buf.
81+
int copyLength = Math.min(len - totalReads, end - pos);
82+
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
83+
pos += copyLength;
84+
totalReads += copyLength;
7785

78-
if (totalReads >= len || super.available() <= 0) {
79-
return totalReads;
86+
if (totalReads >= len || super.available() <= 0) {
87+
return totalReads;
88+
}
8089
}
90+
} finally{
91+
lock.unlock();
8192
}
8293
}
8394

@@ -103,8 +114,13 @@ public void close() throws IOException {
103114
pos = 0;
104115
}
105116

106-
public synchronized int available() throws IOException {
107-
return end - pos + super.available();
117+
public int available() throws IOException {
118+
lock.lock();
119+
try {
120+
return end - pos + super.available();
121+
} finally {
122+
lock.unlock();
123+
}
108124
}
109125

110126
public int read() throws IOException {

0 commit comments

Comments
 (0)