44
44
import io .netty .handler .ssl .SslProvider ;
45
45
import java .io .ByteArrayInputStream ;
46
46
import java .io .File ;
47
- import java .util .concurrent .ExecutorService ;
48
- import java .util .concurrent .Executors ;
49
- import java .util .logging .Level ;
47
+ import java .util .concurrent .FutureTask ;
50
48
import java .util .logging .Logger ;
51
49
import javax .net .ssl .SSLException ;
52
50
import javax .net .ssl .SSLSessionContext ;
@@ -127,28 +125,21 @@ public final class IntegrationTest {
127
125
+ "-----END PRIVATE KEY-----" ;
128
126
129
127
private String s2aAddress ;
130
- private int s2aPort ;
131
128
private Server s2aServer ;
132
129
private String s2aDelayAddress ;
133
- private int s2aDelayPort ;
134
130
private Server s2aDelayServer ;
135
131
private String mtlsS2AAddress ;
136
- private int mtlsS2APort ;
137
132
private Server mtlsS2AServer ;
138
- private int serverPort ;
139
133
private String serverAddress ;
140
134
private Server server ;
141
135
142
136
@ Before
143
137
public void setUp () throws Exception {
144
- s2aPort = Utils .pickUnusedPort ();
138
+ s2aServer = ServerBuilder .forPort (0 ).addService (new FakeS2AServer ()).build ().start ();
139
+ int s2aPort = s2aServer .getPort ();
145
140
s2aAddress = "localhost:" + s2aPort ;
146
- s2aServer = ServerBuilder .forPort (s2aPort ).addService (new FakeS2AServer ()).build ();
147
141
logger .info ("S2A service listening on localhost:" + s2aPort );
148
- s2aServer .start ();
149
142
150
- mtlsS2APort = Utils .pickUnusedPort ();
151
- mtlsS2AAddress = "localhost:" + mtlsS2APort ;
152
143
File s2aCert = new File ("src/test/resources/server_cert.pem" );
153
144
File s2aKey = new File ("src/test/resources/server_key.pem" );
154
145
File rootCert = new File ("src/test/resources/root_cert.pem" );
@@ -158,24 +149,25 @@ public void setUp() throws Exception {
158
149
.trustManager (rootCert )
159
150
.clientAuth (TlsServerCredentials .ClientAuth .REQUIRE )
160
151
.build ();
161
- mtlsS2AServer =
162
- NettyServerBuilder .forPort (mtlsS2APort , s2aCreds ).addService (new FakeS2AServer ()).build ();
163
- logger .info ("mTLS S2A service listening on localhost:" + mtlsS2APort );
152
+ mtlsS2AServer = NettyServerBuilder .forPort (0 , s2aCreds ).addService (new FakeS2AServer ()).build ();
164
153
mtlsS2AServer .start ();
154
+ int mtlsS2APort = mtlsS2AServer .getPort ();
155
+ mtlsS2AAddress = "localhost:" + mtlsS2APort ;
156
+ logger .info ("mTLS S2A service listening on localhost:" + mtlsS2APort );
165
157
166
- s2aDelayPort = Utils .pickUnusedPort ();
158
+ int s2aDelayPort = Utils .pickUnusedPort ();
167
159
s2aDelayAddress = "localhost:" + s2aDelayPort ;
168
160
s2aDelayServer = ServerBuilder .forPort (s2aDelayPort ).addService (new FakeS2AServer ()).build ();
169
161
170
- serverPort = Utils .pickUnusedPort ();
171
- serverAddress = "localhost:" + serverPort ;
172
162
server =
173
- NettyServerBuilder .forPort (serverPort )
163
+ NettyServerBuilder .forPort (0 )
174
164
.addService (new SimpleServiceImpl ())
175
165
.sslContext (buildSslContext ())
176
- .build ();
166
+ .build ()
167
+ .start ();
168
+ int serverPort = server .getPort ();
169
+ serverAddress = "localhost:" + serverPort ;
177
170
logger .info ("Simple Service listening on localhost:" + serverPort );
178
- server .start ();
179
171
}
180
172
181
173
@ After
@@ -193,28 +185,23 @@ public void tearDown() throws Exception {
193
185
194
186
@ Test
195
187
public void clientCommunicateUsingS2ACredentials_succeeds () throws Exception {
196
- ExecutorService executor = Executors .newSingleThreadExecutor ();
197
188
ChannelCredentials credentials =
198
189
S2AChannelCredentials .createBuilder (s2aAddress ).setLocalSpiffeId ("test-spiffe-id" ).build ();
199
- ManagedChannel channel =
200
- Grpc .newChannelBuilder (serverAddress , credentials ).executor (executor ).build ();
190
+ ManagedChannel channel = Grpc .newChannelBuilder (serverAddress , credentials ).build ();
201
191
202
- assertThat (doUnaryRpc (executor , channel )).isTrue ();
192
+ assertThat (doUnaryRpc (channel )).isTrue ();
203
193
}
204
194
205
195
@ Test
206
196
public void clientCommunicateUsingS2ACredentialsNoLocalIdentity_succeeds () throws Exception {
207
- ExecutorService executor = Executors .newSingleThreadExecutor ();
208
197
ChannelCredentials credentials = S2AChannelCredentials .createBuilder (s2aAddress ).build ();
209
- ManagedChannel channel =
210
- Grpc .newChannelBuilder (serverAddress , credentials ).executor (executor ).build ();
198
+ ManagedChannel channel = Grpc .newChannelBuilder (serverAddress , credentials ).build ();
211
199
212
- assertThat (doUnaryRpc (executor , channel )).isTrue ();
200
+ assertThat (doUnaryRpc (channel )).isTrue ();
213
201
}
214
202
215
203
@ Test
216
204
public void clientCommunicateUsingMtlsToS2ACredentials_succeeds () throws Exception {
217
- ExecutorService executor = Executors .newSingleThreadExecutor ();
218
205
ChannelCredentials credentials =
219
206
MtlsToS2AChannelCredentials .createBuilder (
220
207
/* s2aAddress= */ mtlsS2AAddress ,
@@ -224,41 +211,24 @@ public void clientCommunicateUsingMtlsToS2ACredentials_succeeds() throws Excepti
224
211
.build ()
225
212
.setLocalSpiffeId ("test-spiffe-id" )
226
213
.build ();
227
- ManagedChannel channel =
228
- Grpc .newChannelBuilder (serverAddress , credentials ).executor (executor ).build ();
214
+ ManagedChannel channel = Grpc .newChannelBuilder (serverAddress , credentials ).build ();
229
215
230
- assertThat (doUnaryRpc (executor , channel )).isTrue ();
216
+ assertThat (doUnaryRpc (channel )).isTrue ();
231
217
}
232
218
233
219
@ Test
234
220
public void clientCommunicateUsingS2ACredentials_s2AdelayStart_succeeds () throws Exception {
235
- DoUnaryRpc doUnaryRpc = new DoUnaryRpc ();
236
- doUnaryRpc .start ();
221
+ ChannelCredentials credentials = S2AChannelCredentials .createBuilder (s2aDelayAddress ).build ();
222
+ ManagedChannel channel = Grpc .newChannelBuilder (serverAddress , credentials ).build ();
223
+
224
+ FutureTask <Boolean > rpc = new FutureTask <>(() -> doUnaryRpc (channel ));
225
+ new Thread (rpc ).start ();
237
226
Thread .sleep (2000 );
238
227
s2aDelayServer .start ();
239
- doUnaryRpc .join ();
240
- }
241
-
242
- private class DoUnaryRpc extends Thread {
243
- @ Override
244
- public void run () {
245
- ExecutorService executor = Executors .newSingleThreadExecutor ();
246
- ChannelCredentials credentials = S2AChannelCredentials .createBuilder (s2aDelayAddress ).build ();
247
- ManagedChannel channel =
248
- Grpc .newChannelBuilder (serverAddress , credentials ).executor (executor ).build ();
249
- boolean result = false ;
250
- try {
251
- result = doUnaryRpc (executor , channel );
252
- } catch (InterruptedException e ) {
253
- logger .log (Level .SEVERE , "Failed to do unary rpc" , e );
254
- result = false ;
255
- }
256
- assertThat (result ).isTrue ();
257
- }
228
+ assertThat (rpc .get ()).isTrue ();
258
229
}
259
230
260
- public static boolean doUnaryRpc (ExecutorService executor , ManagedChannel channel )
261
- throws InterruptedException {
231
+ public static boolean doUnaryRpc (ManagedChannel channel ) throws InterruptedException {
262
232
try {
263
233
SimpleServiceGrpc .SimpleServiceBlockingStub stub =
264
234
SimpleServiceGrpc .newBlockingStub (channel );
@@ -277,8 +247,6 @@ public static boolean doUnaryRpc(ExecutorService executor, ManagedChannel channe
277
247
} finally {
278
248
channel .shutdown ();
279
249
channel .awaitTermination (1 , SECONDS );
280
- executor .shutdown ();
281
- executor .awaitTermination (1 , SECONDS );
282
250
}
283
251
}
284
252
@@ -318,4 +286,4 @@ public void unaryRpc(SimpleRequest request, StreamObserver<SimpleResponse> obser
318
286
observer .onCompleted ();
319
287
}
320
288
}
321
- }
289
+ }
0 commit comments