Skip to content

Commit cdd81d2

Browse files
author
nmittler
committed
Allowing Netty TLS bootstrap handler to be sharable.
Fixes #504
1 parent 728c8b7 commit cdd81d2

File tree

14 files changed

+376
-94
lines changed

14 files changed

+376
-94
lines changed

benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@
4141

4242
import io.grpc.testing.PayloadType;
4343
import io.grpc.testing.RpcType;
44+
import io.grpc.testing.TestUtils;
4445
import io.grpc.transport.netty.NettyChannelBuilder;
4546

46-
import java.net.InetAddress;
4747
import java.net.InetSocketAddress;
4848
import java.net.SocketAddress;
49-
import java.net.UnknownHostException;
5049
import java.util.Collection;
5150
import java.util.Collections;
5251
import java.util.LinkedHashSet;
@@ -56,7 +55,6 @@
5655
* Configuration options for benchmark clients.
5756
*/
5857
class ClientConfiguration implements Configuration {
59-
private static final String TESTCA_HOST = "foo.test.google.fr";
6058
private static final ClientConfiguration DEFAULT = new ClientConfiguration();
6159

6260
Transport transport = Transport.NETTY_NIO;
@@ -117,14 +115,9 @@ protected ClientConfiguration build0(ClientConfiguration config) {
117115

118116
if (config.testca && config.address instanceof InetSocketAddress) {
119117
// Override the socket address with the host from the testca.
120-
try {
121-
InetSocketAddress prevAddress = (InetSocketAddress) config.address;
122-
InetAddress inetAddress = InetAddress.getByName(prevAddress.getHostName());
123-
inetAddress = InetAddress.getByAddress(TESTCA_HOST, inetAddress.getAddress());
124-
config.address = new InetSocketAddress(inetAddress, prevAddress.getPort());
125-
} catch (UnknownHostException e) {
126-
throw new RuntimeException(e);
127-
}
118+
InetSocketAddress address = (InetSocketAddress) config.address;
119+
config.address = TestUtils.testServerAddress(address.getHostName(),
120+
address.getPort());
128121
}
129122
}
130123

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ subprojects {
252252
test {
253253
testLogging {
254254
exceptionFormat = 'full'
255+
showExceptions true
256+
showCauses true
257+
showStackTraces true
255258
}
256259
}
257260
}

interop-testing/build.gradle

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@ dependencies {
2626

2727
test {
2828
jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
29-
testLogging {
30-
exceptionFormat "full"
31-
showExceptions true
32-
showCauses true
33-
showStackTraces true
34-
}
3529
}
3630

3731
task test_client(type: CreateStartScripts) {

interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@
4343
import org.junit.runners.JUnit4;
4444

4545
import java.io.IOException;
46-
import java.net.InetAddress;
47-
import java.net.InetSocketAddress;
4846

4947
/**
5048
* Integration tests for GRPC over HTTP2 using the Netty framework.
@@ -73,10 +71,8 @@ public static void stopServer() {
7371
@Override
7472
protected ChannelImpl createChannel() {
7573
try {
76-
InetAddress address
77-
= InetAddress.getByAddress("foo.test.google.fr", new byte[] {127, 0, 0, 1});
7874
return NettyChannelBuilder
79-
.forAddress(new InetSocketAddress(address, serverPort))
75+
.forAddress(TestUtils.testServerAddress(serverPort))
8076
.sslContext(GrpcSslContexts.forClient().trustManager(
8177
TestUtils.loadCert("ca.pem")).build())
8278
.build();

interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static void stopServer() throws Exception {
7171
@Override
7272
protected ChannelImpl createChannel() {
7373
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("127.0.0.1", serverPort)
74-
.overrideHostForAuthority("foo.test.google.fr");
74+
.overrideHostForAuthority(TestUtils.TEST_SERVER_HOST);
7575
try {
7676
builder.sslSocketFactory(TestUtils.getSslSocketFactoryForCertainCert(
7777
TestUtils.loadCert("ca.pem")));

netty/build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ dependencies {
55
libraries.netty
66

77
// Tests depend on base class defined by core module.
8-
testCompile project(':grpc-core').sourceSets.test.output
8+
testCompile project(':grpc-core').sourceSets.test.output,
9+
project(':grpc-testing')
10+
}
11+
12+
test {
13+
jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
914
}
1015

1116
javadoc.options.links 'https://netty.io/4.1/api/'

netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737

3838
import io.grpc.Metadata;
3939
import io.grpc.MethodDescriptor;
40+
import io.grpc.Status;
4041
import io.grpc.transport.ClientStream;
4142
import io.grpc.transport.ClientStreamListener;
4243
import io.grpc.transport.ClientTransport;
44+
4345
import io.netty.bootstrap.Bootstrap;
4446
import io.netty.channel.Channel;
4547
import io.netty.channel.ChannelFuture;
@@ -123,16 +125,27 @@ public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata.Headers he
123125
Preconditions.checkNotNull(listener, "listener");
124126

125127
// Create the stream.
126-
NettyClientStream stream = new NettyClientStream(listener, channel, handler);
128+
final NettyClientStream stream = new NettyClientStream(listener, channel, handler);
127129

128130
// Convert the headers into Netty HTTP/2 headers.
129131
AsciiString defaultPath = new AsciiString("/" + method.getName());
130132
Http2Headers http2Headers = Utils.convertClientHeaders(headers, negotiationHandler.scheme(),
131133
defaultPath, authority);
132134

135+
ChannelFutureListener failureListener = new ChannelFutureListener() {
136+
@Override
137+
public void operationComplete(ChannelFuture future) throws Exception {
138+
if (!future.isSuccess()) {
139+
// Stream creation failed. Close the stream if not already closed.
140+
stream.transportReportStatus(Status.fromThrowable(future.cause()), true,
141+
new Metadata.Trailers());
142+
}
143+
}
144+
};
145+
133146
// Write the command requesting the creation of the stream.
134147
handler.getWriteQueue().enqueue(new CreateStreamCommand(http2Headers, stream),
135-
!method.getType().clientSendsOneMessage());
148+
!method.getType().clientSendsOneMessage()).addListener(failureListener);
136149
return stream;
137150
}
138151

@@ -159,19 +172,7 @@ public void start(Listener transportListener) {
159172
channel.closeFuture().addListener(new ChannelFutureListener() {
160173
@Override
161174
public void operationComplete(ChannelFuture future) throws Exception {
162-
if (!future.isSuccess()) {
163-
// The close failed. Just notify that transport shutdown failed.
164-
notifyTerminated(future.cause());
165-
return;
166-
}
167-
168-
if (handler.connectionError() != null) {
169-
// The handler encountered a connection error.
170-
notifyTerminated(handler.connectionError());
171-
} else {
172-
// Normal termination of the connection.
173-
notifyTerminated(null);
174-
}
175+
notifyTerminated(handler.connectionError());
175176
}
176177
});
177178
}

netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,7 @@ public void start(ServerTransportListener listener) {
9090
channel.closeFuture().addListener(new ChannelFutureListener() {
9191
@Override
9292
public void operationComplete(ChannelFuture future) throws Exception {
93-
if (!future.isSuccess()) {
94-
notifyTerminated(future.cause());
95-
} else if (handler.connectionError() != null) {
96-
// The handler encountered a connection error.
97-
notifyTerminated(handler.connectionError());
98-
} else {
99-
// Normal termination of the connection.
100-
notifyTerminated(null);
101-
}
93+
notifyTerminated(handler.connectionError());
10294
}
10395
});
10496

netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import com.google.common.base.Preconditions;
3535
import com.google.common.util.concurrent.SettableFuture;
3636

37+
import io.grpc.Status;
38+
3739
import io.netty.channel.Channel;
3840
import io.netty.channel.ChannelDuplexHandler;
3941
import io.netty.channel.ChannelHandler;
@@ -116,31 +118,33 @@ public static ProtocolNegotiator tls(final SslContext sslContext,
116118
Preconditions.checkNotNull(sslContext, "sslContext");
117119
Preconditions.checkNotNull(inetAddress, "inetAddress");
118120

119-
final ChannelHandler sslBootstrapHandler = new ChannelHandlerAdapter() {
121+
return new ProtocolNegotiator() {
120122
@Override
121-
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
122-
// TODO(nmittler): This method is currently unsupported for OpenSSL. Need to fix in Netty.
123-
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
124-
inetAddress.getHostName(), inetAddress.getPort());
125-
SSLParameters sslParams = new SSLParameters();
126-
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
127-
sslEngine.setSSLParameters(sslParams);
128-
129-
final SettableFuture<Void> completeFuture = SettableFuture.create();
130-
if (isOpenSsl(sslContext.getClass())) {
131-
completeFuture.set(null);
132-
} else {
133-
// Using JDK SSL
134-
if (!installJettyTlsProtocolSelection(sslEngine, completeFuture, false)) {
135-
throw new IllegalStateException("NPN/ALPN extensions not installed");
136-
}
137-
}
123+
public Handler newHandler(Http2ConnectionHandler handler) {
124+
ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
125+
@Override
126+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
127+
// TODO(nmittler): Unsupported for OpenSSL in Netty < 4.1.Beta6.
128+
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
129+
inetAddress.getHostName(), inetAddress.getPort());
130+
SSLParameters sslParams = new SSLParameters();
131+
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
132+
sslEngine.setSSLParameters(sslParams);
133+
134+
final SettableFuture<Void> completeFuture = SettableFuture.create();
135+
if (isOpenSsl(sslContext.getClass())) {
136+
completeFuture.set(null);
137+
} else {
138+
// Using JDK SSL
139+
if (!installJettyTlsProtocolSelection(sslEngine, completeFuture, false)) {
140+
throw new IllegalStateException("NPN/ALPN extensions not installed");
141+
}
142+
}
138143

139-
SslHandler sslHandler = new SslHandler(sslEngine, false);
140-
sslHandler.handshakeFuture().addListener(
141-
new GenericFutureListener<Future<? super Channel>>() {
144+
SslHandler sslHandler = new SslHandler(sslEngine, false);
145+
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
142146
@Override
143-
public void operationComplete(Future<? super Channel> future) throws Exception {
147+
public void operationComplete(Future<Channel> future) throws Exception {
144148
// If an error occurred during the handshake, throw it to the pipeline.
145149
if (future.isSuccess()) {
146150
completeFuture.get();
@@ -149,13 +153,10 @@ public void operationComplete(Future<? super Channel> future) throws Exception {
149153
}
150154
}
151155
});
152-
ctx.pipeline().replace(this, "sslHandler", sslHandler);
153-
}
154-
};
155-
return new ProtocolNegotiator() {
156-
@Override
157-
public Handler newHandler(Http2ConnectionHandler handler) {
158-
return new BufferUntilTlsNegotiatedHandler(sslBootstrapHandler, handler);
156+
ctx.pipeline().replace(this, null, sslHandler);
157+
}
158+
};
159+
return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
159160
}
160161
};
161162
}
@@ -190,6 +191,10 @@ public Handler newHandler(Http2ConnectionHandler handler) {
190191
};
191192
}
192193

194+
private static RuntimeException unavailableException(String msg) {
195+
return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
196+
}
197+
193198
/**
194199
* Returns {@code true} if the given class is for use with Netty OpenSsl.
195200
*/
@@ -199,7 +204,7 @@ private static boolean isOpenSsl(Class<?> clazz) {
199204

200205
/**
201206
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
202-
* {@link #failBufferedAndClose(ChannelHandlerContext)} is called. This handler allows us to
207+
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
203208
* write to a {@link Channel} before we are allowed to write to it officially i.e.
204209
* before it's active or the TLS Handshake is complete.
205210
*/
@@ -231,9 +236,14 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
231236
super.channelRegistered(ctx);
232237
}
233238

239+
@Override
240+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
241+
fail(ctx, cause);
242+
}
243+
234244
@Override
235245
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
236-
failBufferedAndClose(ctx);
246+
fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
237247
super.channelInactive(ctx);
238248
}
239249

@@ -275,26 +285,28 @@ public void flush(ChannelHandlerContext ctx) {
275285

276286
@Override
277287
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
278-
failBufferedAndClose(ctx);
288+
fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
279289
}
280290

281-
protected void failBufferedAndClose(ChannelHandlerContext ctx) {
291+
protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
282292
if (bufferedWrites != null) {
283-
Exception e = new Exception("Buffered write failed.");
284293
while (!bufferedWrites.isEmpty()) {
285294
ChannelWrite write = bufferedWrites.poll();
286-
write.promise.setFailure(e);
295+
write.promise.setFailure(cause);
287296
}
288297
bufferedWrites = null;
289298
}
299+
300+
log.log(Level.SEVERE, "Transport failed during protocol negotiation", cause);
301+
290302
/**
291303
* In case something goes wrong ensure that the channel gets closed as the
292304
* NettyClientTransport relies on the channel's close future to get completed.
293305
*/
294306
ctx.close();
295307
}
296308

297-
protected void writeBufferedAndRemove(ChannelHandlerContext ctx) {
309+
protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
298310
if (!ctx.channel().isActive() || writing) {
299311
return;
300312
}
@@ -349,7 +361,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
349361
if (handshakeEvent.isSuccess()) {
350362
writeBufferedAndRemove(ctx);
351363
} else {
352-
failBufferedAndClose(ctx);
364+
fail(ctx, handshakeEvent.cause());
353365
}
354366
}
355367
super.userEventTriggered(ctx, evt);
@@ -413,8 +425,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
413425
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
414426
writeBufferedAndRemove(ctx);
415427
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
416-
failBufferedAndClose(ctx);
417-
ctx.pipeline().fireExceptionCaught(new Exception("HTTP/2 upgrade rejected"));
428+
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
418429
}
419430
super.userEventTriggered(ctx, evt);
420431
}

netty/src/main/java/io/grpc/transport/netty/WriteQueue.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.common.base.Preconditions;
3535

3636
import io.netty.channel.Channel;
37+
import io.netty.channel.ChannelFuture;
3738
import io.netty.channel.ChannelPromise;
3839

3940
import java.util.ArrayDeque;
@@ -92,8 +93,8 @@ void scheduleFlush() {
9293
* @param flush true if a flush of the write should be schedule, false if a later call to
9394
* enqueue will schedule the flush.
9495
*/
95-
void enqueue(Object command, boolean flush) {
96-
enqueue(command, channel.newPromise(), flush);
96+
ChannelFuture enqueue(Object command, boolean flush) {
97+
return enqueue(command, channel.newPromise(), flush);
9798
}
9899

99100
/**
@@ -104,11 +105,12 @@ void enqueue(Object command, boolean flush) {
104105
* @param flush true if a flush of the write should be schedule, false if a later call to
105106
* enqueue will schedule the flush.
106107
*/
107-
void enqueue(Object command, ChannelPromise promise, boolean flush) {
108+
ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
108109
queue.add(new QueuedCommand(command, promise));
109110
if (flush) {
110111
scheduleFlush();
111112
}
113+
return promise;
112114
}
113115

114116
/**

0 commit comments

Comments
 (0)