Skip to content

Commit 368fb57

Browse files
author
oleksiys
committed
[master] + integrate Grizzly 2.3.12
1 parent 3b4aa0c commit 368fb57

File tree

3 files changed

+325
-19
lines changed

3 files changed

+325
-19
lines changed

providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2012 Sonatype, Inc. All rights reserved.
2+
* Copyright (c) 2012-2014 Sonatype, Inc. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -73,6 +73,7 @@
7373
import java.util.concurrent.ExecutorService;
7474
import java.util.concurrent.TimeUnit;
7575
import java.util.concurrent.TimeoutException;
76+
import org.glassfish.grizzly.spdy.SpdyVersion;
7677

7778
/**
7879
* A Grizzly 2.0-based implementation of {@link AsyncHttpProvider}.
@@ -441,8 +442,9 @@ void timeout(final Connection c) {
441442
// ---------------------------------------------------------- Nested Classes
442443

443444
private static final class ProtocolNegotiator implements ClientSideNegotiator {
445+
private static final SpdyVersion[] SUPPORTED_SPDY_VERSIONS =
446+
{SpdyVersion.SPDY_3_1, SpdyVersion.SPDY_3};
444447

445-
private static final String SPDY = "spdy/3";
446448
private static final String HTTP = "HTTP/1.1";
447449

448450
private final FilterChain spdyFilterChain;
@@ -465,23 +467,31 @@ public boolean wantNegotiate(SSLEngine engine) {
465467
}
466468

467469
@Override
468-
public String selectProtocol(SSLEngine engine, LinkedHashSet<String> strings) {
469-
GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selectProtocol: " + strings);
470+
public String selectProtocol(SSLEngine engine, LinkedHashSet<String> protocols) {
471+
GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selectProtocol: " + protocols);
470472
final Connection connection = NextProtoNegSupport.getConnection(engine);
471473

472-
// Give preference to SPDY/3. If not available, check for HTTP as a
473-
// fallback
474-
if (strings.contains(SPDY)) {
475-
GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + SPDY);
476-
SSLConnectionContext sslCtx = SSLUtils.getSslConnectionContext(connection);
477-
sslCtx.setNewConnectionFilterChain(spdyFilterChain);
478-
final SpdySession spdySession = new SpdySession(connection, false, spdyHandlerFilter);
479-
spdySession.setLocalInitialWindowSize(spdyHandlerFilter.getInitialWindowSize());
480-
spdySession.setLocalMaxConcurrentStreams(spdyHandlerFilter.getMaxConcurrentStreams());
481-
Utils.setSpdyConnection(connection);
482-
SpdySession.bind(connection, spdySession);
483-
return SPDY;
484-
} else if (strings.contains(HTTP)) {
474+
// Give preference to SPDY/3.1 or SPDY/3. If not available, check for HTTP as a
475+
// fallback
476+
for (SpdyVersion version : SUPPORTED_SPDY_VERSIONS) {
477+
final String versionDef = version.toString();
478+
if (protocols.contains(versionDef)) {
479+
GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + versionDef);
480+
SSLConnectionContext sslCtx = SSLUtils.getSslConnectionContext(connection);
481+
sslCtx.setNewConnectionFilterChain(spdyFilterChain);
482+
final SpdySession spdySession =
483+
version.newSession(connection, false, spdyHandlerFilter);
484+
485+
spdySession.setLocalStreamWindowSize(spdyHandlerFilter.getInitialWindowSize());
486+
spdySession.setLocalMaxConcurrentStreams(spdyHandlerFilter.getMaxConcurrentStreams());
487+
Utils.setSpdyConnection(connection);
488+
SpdySession.bind(connection, spdySession);
489+
490+
return versionDef;
491+
}
492+
}
493+
494+
if (protocols.contains(HTTP)) {
485495
GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + HTTP);
486496
// Use the default HTTP FilterChain.
487497
return HTTP;

providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientFilter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,21 +281,25 @@ private boolean sendAsGrizzlyRequest(final RequestInfoHolder requestInfoHolder,
281281
sendingCtx = checkAndHandleFilterChainUpdate(ctx, sendingCtx);
282282
}
283283
final Connection c = ctx.getConnection();
284+
final HttpContext httpCtx;
284285
if (!Utils.isSpdyConnection(c)) {
285-
HttpContext.newInstance(ctx, c, c, c);
286+
httpCtx = HttpContext.newInstance(c, c, c, requestPacketLocal);
286287
} else {
287288
SpdySession session = SpdySession.get(c);
288289
final Lock lock = session.getNewClientStreamLock();
289290
try {
290291
lock.lock();
291292
SpdyStream stream = session.openStream(requestPacketLocal, session.getNextLocalStreamId(), 0, 0, 0, false,
292293
!requestPacketLocal.isExpectContent());
293-
HttpContext.newInstance(ctx, stream, stream, stream);
294+
httpCtx = HttpContext.newInstance(stream, stream, stream, requestPacketLocal);
294295
} finally {
295296
lock.unlock();
296297
}
297298
}
299+
httpCtx.attach(ctx);
298300
HttpTxContext.set(ctx, httpTxContext);
301+
requestPacketLocal.getProcessingState().setHttpContext(httpCtx);
302+
299303
return sendRequest(sendingCtx, request, requestPacketLocal);
300304
}
301305

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* Copyright (c) 2013 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
14+
package org.asynchttpclient.providers.grizzly;
15+
16+
import org.asynchttpclient.AsyncCompletionHandler;
17+
import org.asynchttpclient.AsyncHttpClient;
18+
import org.asynchttpclient.AsyncHttpClientConfig;
19+
import org.asynchttpclient.RequestBuilder;
20+
import org.glassfish.grizzly.Buffer;
21+
import org.glassfish.grizzly.http.server.HttpHandler;
22+
import org.glassfish.grizzly.http.server.HttpServer;
23+
import org.glassfish.grizzly.http.server.NetworkListener;
24+
import org.glassfish.grizzly.http.server.Request;
25+
import org.glassfish.grizzly.http.server.Response;
26+
import org.glassfish.grizzly.memory.Buffers;
27+
import org.glassfish.grizzly.ssl.SSLContextConfigurator;
28+
import org.glassfish.grizzly.ssl.SSLEngineConfigurator;
29+
import org.glassfish.grizzly.utils.Charsets;
30+
import org.testng.annotations.AfterTest;
31+
import org.testng.annotations.BeforeTest;
32+
import org.testng.annotations.Test;
33+
34+
import java.io.File;
35+
import java.io.FileInputStream;
36+
import java.io.FileOutputStream;
37+
import java.io.IOException;
38+
import java.io.InputStream;
39+
import java.net.URL;
40+
import java.util.Random;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.TimeUnit;
45+
import org.asynchttpclient.DefaultAsyncHttpClient;
46+
47+
import static org.glassfish.grizzly.http.server.NetworkListener.DEFAULT_NETWORK_HOST;
48+
import static org.glassfish.grizzly.memory.MemoryManager.DEFAULT_MEMORY_MANAGER;
49+
import static org.testng.Assert.assertNull;
50+
import static org.testng.Assert.fail;
51+
import static org.testng.AssertJUnit.assertEquals;
52+
53+
public class GrizzlyFeedableBodyGeneratorTest {
54+
55+
private static final byte[] DATA =
56+
"aAbBcCdDeEfFgGhHiIjJkKlLmMnNoOpPqQrRsStTuUvVwWxXyYzZ".getBytes(Charsets.ASCII_CHARSET);
57+
private static final int TEMP_FILE_SIZE = 2 * 1024 * 1024;
58+
private static final int NON_SECURE_PORT = 9991;
59+
private static final int SECURE_PORT = 9992;
60+
61+
62+
private HttpServer server;
63+
private File tempFile;
64+
65+
66+
// ------------------------------------------------------------------- Setup
67+
68+
69+
@BeforeTest
70+
public void setup() throws Exception {
71+
generateTempFile();
72+
server = new HttpServer();
73+
NetworkListener nonSecure =
74+
new NetworkListener("nonsecure",
75+
DEFAULT_NETWORK_HOST,
76+
NON_SECURE_PORT);
77+
NetworkListener secure =
78+
new NetworkListener("secure",
79+
DEFAULT_NETWORK_HOST,
80+
SECURE_PORT);
81+
secure.setSecure(true);
82+
secure.setSSLEngineConfig(createSSLConfig());
83+
server.addListener(nonSecure);
84+
server.addListener(secure);
85+
server.getServerConfiguration().addHttpHandler(new ConsumingHandler(), "/test");
86+
server.start();
87+
}
88+
89+
90+
// --------------------------------------------------------------- Tear Down
91+
92+
93+
@AfterTest
94+
public void tearDown() {
95+
if (!tempFile.delete()) {
96+
tempFile.deleteOnExit();
97+
}
98+
tempFile = null;
99+
server.shutdownNow();
100+
server = null;
101+
}
102+
103+
104+
// ------------------------------------------------------------ Test Methods
105+
106+
107+
@Test
108+
public void testSimpleFeederMultipleThreads() throws Exception {
109+
doSimpleFeeder(false);
110+
}
111+
112+
@Test
113+
public void testSimpleFeederOverSSLMultipleThreads() throws Exception {
114+
doSimpleFeeder(true);
115+
}
116+
117+
118+
// --------------------------------------------------------- Private Methods
119+
120+
121+
private void doSimpleFeeder(final boolean secure) {
122+
final int threadCount = 10;
123+
final CountDownLatch latch = new CountDownLatch(threadCount);
124+
final int port = (secure ? SECURE_PORT : NON_SECURE_PORT);
125+
final String scheme = (secure ? "https" : "http");
126+
ExecutorService service = Executors.newFixedThreadPool(threadCount);
127+
128+
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder()
129+
.setMaximumConnectionsPerHost(60)
130+
.setMaximumConnectionsTotal(60)
131+
.build();
132+
final AsyncHttpClient client =
133+
new DefaultAsyncHttpClient(new GrizzlyAsyncHttpProvider(config), config);
134+
final int[] statusCodes = new int[threadCount];
135+
final int[] totalsReceived = new int[threadCount];
136+
final Throwable[] errors = new Throwable[threadCount];
137+
for (int i = 0; i < threadCount; i++) {
138+
final int idx = i;
139+
service.execute(new Runnable() {
140+
@Override
141+
public void run() {
142+
FeedableBodyGenerator generator =
143+
new FeedableBodyGenerator();
144+
FeedableBodyGenerator.SimpleFeeder simpleFeeder =
145+
new FeedableBodyGenerator.SimpleFeeder(generator) {
146+
@Override
147+
public void flush() throws IOException {
148+
FileInputStream in = null;
149+
try {
150+
final byte[] bytesIn = new byte[2048];
151+
in = new FileInputStream(tempFile);
152+
int read;
153+
while ((read = in.read(bytesIn)) != -1) {
154+
final Buffer b =
155+
Buffers.wrap(
156+
DEFAULT_MEMORY_MANAGER,
157+
bytesIn,
158+
0,
159+
read);
160+
feed(b, false);
161+
}
162+
feed(Buffers.EMPTY_BUFFER, true);
163+
} finally {
164+
if (in != null) {
165+
try {
166+
in.close();
167+
} catch (IOException ignored) {
168+
}
169+
}
170+
}
171+
}
172+
};
173+
generator.setFeeder(simpleFeeder);
174+
generator.setMaxPendingBytes(10000);
175+
176+
RequestBuilder builder = new RequestBuilder("POST");
177+
builder.setUrl(scheme + "://localhost:" + port + "/test");
178+
builder.setBody(generator);
179+
try {
180+
client.executeRequest(builder.build(),
181+
new AsyncCompletionHandler<org.asynchttpclient.Response>() {
182+
@Override
183+
public org.asynchttpclient.Response onCompleted(org.asynchttpclient.Response response)
184+
throws Exception {
185+
try {
186+
totalsReceived[idx] = Integer.parseInt(response.getHeader("x-total"));
187+
} catch (Exception e) {
188+
errors[idx] = e;
189+
}
190+
statusCodes[idx] = response.getStatusCode();
191+
latch.countDown();
192+
return response;
193+
}
194+
195+
@Override
196+
public void onThrowable(Throwable t) {
197+
errors[idx] = t;
198+
t.printStackTrace();
199+
latch.countDown();
200+
}
201+
});
202+
} catch (IOException e) {
203+
errors[idx] = e;
204+
latch.countDown();
205+
}
206+
}
207+
});
208+
}
209+
210+
try {
211+
latch.await(1, TimeUnit.MINUTES);
212+
} catch (InterruptedException e) {
213+
fail("Latch interrupted");
214+
}
215+
216+
for (int i = 0; i < threadCount; i++) {
217+
assertEquals(200, statusCodes[i]);
218+
assertNull(errors[i]);
219+
assertEquals(tempFile.length(), totalsReceived[i]);
220+
}
221+
}
222+
223+
224+
private static SSLEngineConfigurator createSSLConfig()
225+
throws Exception {
226+
final SSLContextConfigurator sslContextConfigurator =
227+
new SSLContextConfigurator();
228+
final ClassLoader cl = GrizzlyFeedableBodyGeneratorTest.class.getClassLoader();
229+
// override system properties
230+
final URL cacertsUrl = cl.getResource("ssltest-cacerts.jks");
231+
if (cacertsUrl != null) {
232+
sslContextConfigurator.setTrustStoreFile(cacertsUrl.getFile());
233+
sslContextConfigurator.setTrustStorePass("changeit");
234+
}
235+
236+
// override system properties
237+
final URL keystoreUrl = cl.getResource("ssltest-keystore.jks");
238+
if (keystoreUrl != null) {
239+
sslContextConfigurator.setKeyStoreFile(keystoreUrl.getFile());
240+
sslContextConfigurator.setKeyStorePass("changeit");
241+
}
242+
243+
return new SSLEngineConfigurator(
244+
sslContextConfigurator.createSSLContext(),
245+
false, false, false);
246+
}
247+
248+
249+
private void generateTempFile() throws IOException {
250+
tempFile = File.createTempFile("feedable", null);
251+
int total = 0;
252+
byte[] chunk = new byte[1024];
253+
Random r = new Random(System.currentTimeMillis());
254+
FileOutputStream out = new FileOutputStream(tempFile);
255+
while (total < TEMP_FILE_SIZE) {
256+
for (int i = 0; i < chunk.length; i++) {
257+
chunk[i] = DATA[r.nextInt(DATA.length)];
258+
}
259+
out.write(chunk);
260+
total += chunk.length;
261+
}
262+
out.flush();
263+
out.close();
264+
}
265+
266+
267+
// ---------------------------------------------------------- Nested Classes
268+
269+
270+
private static final class ConsumingHandler extends HttpHandler {
271+
272+
273+
// -------------------------------------------- Methods from HttpHandler
274+
275+
276+
@Override
277+
public void service(Request request, Response response)
278+
throws Exception {
279+
int total = 0;
280+
byte[] bytesIn = new byte[2048];
281+
InputStream in = request.getInputStream();
282+
int read;
283+
while ((read = in.read(bytesIn)) != -1) {
284+
total += read;
285+
Thread.sleep(5);
286+
}
287+
response.addHeader("X-Total", Integer.toString(total));
288+
}
289+
290+
} // END ConsumingHandler
291+
292+
}

0 commit comments

Comments
 (0)