Skip to content

Commit 540265c

Browse files
committed
fix server
1 parent a06be73 commit 540265c

File tree

3 files changed

+83
-12
lines changed

3 files changed

+83
-12
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,14 @@ public void runOnTransportThread(final Runnable r) {
208208
* Must be called with holding the transport lock.
209209
*/
210210
@Override
211-
public void inboundDataReceived(okio.Buffer frame, int windowConsumed, boolean endOfStream) {
211+
public void inboundDataReceived(okio.Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
212212
synchronized (lock) {
213213
PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
214214
if (endOfStream) {
215215
this.receivedEndOfStream = true;
216216
}
217-
window -= windowConsumed;
217+
window -= dataLength + paddingLength;
218+
processedWindow -= paddingLength;
218219
super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
219220
}
220221
}

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
2020
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Preconditions;
2324
import com.google.common.util.concurrent.Futures;
2425
import com.google.common.util.concurrent.ListenableFuture;
@@ -139,6 +140,8 @@ final class OkHttpServerTransport implements ServerTransport,
139140
@GuardedBy("lock")
140141
private Long gracefulShutdownPeriod = null;
141142

143+
private FrameHandler handler;
144+
142145
public OkHttpServerTransport(Config config, Socket bareSocket) {
143146
this.config = Preconditions.checkNotNull(config, "config");
144147
this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
@@ -248,8 +251,8 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
248251
TimeUnit.NANOSECONDS);
249252
}
250253

251-
transportExecutor.execute(
252-
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
254+
handler = new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false));
255+
transportExecutor.execute(handler);
253256
} catch (Error | IOException | RuntimeException ex) {
254257
synchronized (lock) {
255258
if (!handshakeShutdown) {
@@ -261,6 +264,11 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
261264
}
262265
}
263266

267+
@VisibleForTesting
268+
FrameHandler getHandler() {
269+
return handler;
270+
}
271+
264272
@Override
265273
public void shutdown() {
266274
shutdown(null);
@@ -708,7 +716,7 @@ public void headers(boolean outFinished,
708716
return;
709717
}
710718
// Ignore the trailers, but still half-close the stream
711-
stream.inboundDataReceived(new Buffer(), 0, true);
719+
stream.inboundDataReceived(new Buffer(), 0, 0, true);
712720
return;
713721
}
714722
} else {
@@ -799,7 +807,7 @@ public void headers(boolean outFinished,
799807
listener.streamCreated(streamForApp, method, metadata);
800808
stream.onStreamAllocated();
801809
if (inFinished) {
802-
stream.inboundDataReceived(new Buffer(), 0, inFinished);
810+
stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
803811
}
804812
}
805813
}
@@ -854,15 +862,15 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
854862
"Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
855863
return;
856864
}
857-
if (stream.inboundWindowAvailable() < length) {
865+
if (stream.inboundWindowAvailable() < paddedLength) {
858866
in.skip(length);
859867
streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
860868
"Received DATA size exceeded window size. RFC7540 section 6.9");
861869
return;
862870
}
863871
Buffer buf = new Buffer();
864872
buf.write(in.getBuffer(), length);
865-
stream.inboundDataReceived(buf, length, inFinished);
873+
stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
866874
}
867875

868876
// connection window update
@@ -1065,7 +1073,7 @@ private void respondWithHttpError(
10651073
}
10661074
streams.put(streamId, stream);
10671075
if (inFinished) {
1068-
stream.inboundDataReceived(new Buffer(), 0, true);
1076+
stream.inboundDataReceived(new Buffer(), 0, 0,true);
10691077
}
10701078
frameWriter.headers(streamId, headers);
10711079
outboundFlow.data(
@@ -1123,7 +1131,7 @@ public void onPingTimeout() {
11231131

11241132
interface StreamState {
11251133
/** Must be holding 'lock' when calling. */
1126-
void inboundDataReceived(Buffer frame, int windowConsumed, boolean endOfStream);
1134+
void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
11271135

11281136
/** Must be holding 'lock' when calling. */
11291137
boolean hasReceivedEndOfStream();
@@ -1160,12 +1168,12 @@ static class Http2ErrorStreamState implements StreamState, OutboundFlowControlle
11601168
@Override public void onSentBytes(int frameBytes) {}
11611169

11621170
@Override public void inboundDataReceived(
1163-
Buffer frame, int windowConsumed, boolean endOfStream) {
1171+
Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
11641172
synchronized (lock) {
11651173
if (endOfStream) {
11661174
receivedEndOfStream = true;
11671175
}
1168-
window -= windowConsumed;
1176+
window -= dataLength + paddingLength;
11691177
try {
11701178
frame.skip(frame.size()); // Recycle segments
11711179
} catch (IOException ex) {

okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.Mockito.doAnswer;
3232
import static org.mockito.Mockito.mock;
3333
import static org.mockito.Mockito.never;
34+
import static org.mockito.Mockito.reset;
3435
import static org.mockito.Mockito.timeout;
3536
import static org.mockito.Mockito.verify;
3637

@@ -60,6 +61,7 @@
6061
import java.net.ServerSocket;
6162
import java.net.Socket;
6263
import java.util.ArrayDeque;
64+
import java.util.ArrayList;
6365
import java.util.Arrays;
6466
import java.util.Deque;
6567
import java.util.List;
@@ -998,6 +1000,61 @@ public void httpErrorsAdhereToFlowControl() throws Exception {
9981000
shutdownAndTerminate(/*lastStreamId=*/ 1);
9991001
}
10001002

1003+
@Test
1004+
public void windowUpdate() throws Exception {
1005+
initTransport();
1006+
handshake();
1007+
OkHttpServerTransport.FrameHandler handler = serverTransport.getHandler();
1008+
List<Header> headers = Arrays.asList(
1009+
HTTP_SCHEME_HEADER,
1010+
METHOD_HEADER,
1011+
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
1012+
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
1013+
CONTENT_TYPE_HEADER,
1014+
TE_HEADER,
1015+
new Header("some-metadata", "this could be anything"));
1016+
1017+
handler.headers(false, false, 1, 0, new ArrayList<>(headers), HeadersMode.HTTP_20_HEADERS);
1018+
MockStreamListener streamListener1 = mockTransportListener.newStreams.pop();
1019+
handler.headers(false, false, 3, 0, new ArrayList<>(headers), HeadersMode.HTTP_20_HEADERS);
1020+
MockStreamListener streamListener2 = mockTransportListener.newStreams.pop();
1021+
reset(clientFramesRead);
1022+
1023+
int messageSize = INITIAL_WINDOW_SIZE / 4 ;
1024+
int paddingLength = 10;
1025+
Buffer requestMessageFrame = createMessageFrame(new String(new char[messageSize]), paddingLength);
1026+
int frameSize = (int) requestMessageFrame.size();
1027+
1028+
handler.data(false, 1, requestMessageFrame, frameSize - paddingLength, frameSize);
1029+
1030+
requestMessageFrame = createMessageFrame(new String(new char[messageSize]), paddingLength);
1031+
handler.data(false, 3, requestMessageFrame, frameSize - paddingLength, frameSize);
1032+
1033+
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
1034+
verify(clientFramesRead).windowUpdate(0, frameSize * 2);
1035+
1036+
requestMessageFrame = createMessageFrame(new String(new char[messageSize]), 0);
1037+
handler.data(false, 3, requestMessageFrame, frameSize - paddingLength, frameSize - paddingLength);
1038+
1039+
streamListener1.stream.request(1);
1040+
streamListener2.stream.request(2);
1041+
1042+
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
1043+
verify(clientFramesRead).windowUpdate(3, frameSize * 2 - paddingLength);
1044+
1045+
1046+
paddingLength = 2 * messageSize + 100;
1047+
requestMessageFrame = createMessageFrame(new String(new char[messageSize]), paddingLength);
1048+
frameSize = (int) requestMessageFrame.size();
1049+
handler.data(false, 1, requestMessageFrame, frameSize - paddingLength, frameSize);
1050+
1051+
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
1052+
verify(clientFramesRead).rstStream(eq(1), eq(ErrorCode.FLOW_CONTROL_ERROR));
1053+
1054+
handler.rstStream(3, ErrorCode.CANCEL);
1055+
shutdownAndTerminate(3);
1056+
}
1057+
10011058
@Test
10021059
public void dataForStream0_failsWithGoAway() throws Exception {
10031060
initTransport();
@@ -1220,11 +1277,16 @@ private void handshake(Settings settings) throws Exception {
12201277
}
12211278

12221279
private static Buffer createMessageFrame(String stringMessage) {
1280+
return createMessageFrame(stringMessage, 0);
1281+
}
1282+
1283+
private static Buffer createMessageFrame(String stringMessage, int paddingLength) {
12231284
byte[] message = stringMessage.getBytes(UTF_8);
12241285
Buffer buffer = new Buffer();
12251286
buffer.writeByte(0 /* UNCOMPRESSED */);
12261287
buffer.writeInt(message.length);
12271288
buffer.write(message);
1289+
buffer.write(new byte[paddingLength]);
12281290
return buffer;
12291291
}
12301292

0 commit comments

Comments
 (0)