Skip to content

Commit 6faf1f3

Browse files
committed
Proper Fixes AsyncHttpClient#317
1 parent 137021f commit 6faf1f3

File tree

1 file changed

+97
-99
lines changed

1 file changed

+97
-99
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 97 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,49 @@
1515
*/
1616
package org.asynchttpclient.providers.netty;
1717

18-
import static org.asynchttpclient.util.AsyncHttpProviderUtils.DEFAULT_CHARSET;
19-
import static org.asynchttpclient.util.DateUtil.millisTime;
20-
import static org.asynchttpclient.util.MiscUtil.isNonEmpty;
21-
import static org.jboss.netty.channel.Channels.pipeline;
22-
23-
import java.io.File;
24-
import java.io.FileInputStream;
25-
import java.io.IOException;
26-
import java.io.RandomAccessFile;
27-
import java.net.ConnectException;
28-
import java.net.InetSocketAddress;
29-
import java.net.MalformedURLException;
30-
import java.net.URI;
31-
import java.nio.channels.ClosedChannelException;
32-
import java.nio.channels.FileChannel;
33-
import java.nio.channels.WritableByteChannel;
34-
import java.nio.charset.Charset;
35-
import java.security.GeneralSecurityException;
36-
import java.security.NoSuchAlgorithmException;
37-
import java.util.ArrayList;
38-
import java.util.List;
39-
import java.util.Map.Entry;
40-
import java.util.concurrent.Callable;
41-
import java.util.concurrent.CountDownLatch;
42-
import java.util.concurrent.ExecutionException;
43-
import java.util.concurrent.ExecutorService;
44-
import java.util.concurrent.Executors;
45-
import java.util.concurrent.Future;
46-
import java.util.concurrent.RejectedExecutionException;
47-
import java.util.concurrent.Semaphore;
48-
import java.util.concurrent.TimeUnit;
49-
import java.util.concurrent.TimeoutException;
50-
import java.util.concurrent.atomic.AtomicBoolean;
51-
52-
import javax.net.ssl.SSLEngine;
53-
18+
import org.asynchttpclient.AsyncHandler;
19+
import org.asynchttpclient.AsyncHandler.STATE;
20+
import org.asynchttpclient.AsyncHttpClientConfig;
21+
import org.asynchttpclient.AsyncHttpProvider;
22+
import org.asynchttpclient.Body;
23+
import org.asynchttpclient.BodyGenerator;
24+
import org.asynchttpclient.ConnectionPoolKeyStrategy;
25+
import org.asynchttpclient.ConnectionsPool;
26+
import org.asynchttpclient.Cookie;
27+
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
28+
import org.asynchttpclient.HttpResponseBodyPart;
29+
import org.asynchttpclient.HttpResponseHeaders;
30+
import org.asynchttpclient.HttpResponseStatus;
31+
import org.asynchttpclient.ListenableFuture;
32+
import org.asynchttpclient.MaxRedirectException;
33+
import org.asynchttpclient.ProgressAsyncHandler;
34+
import org.asynchttpclient.ProxyServer;
35+
import org.asynchttpclient.RandomAccessBody;
36+
import org.asynchttpclient.Realm;
37+
import org.asynchttpclient.Request;
38+
import org.asynchttpclient.RequestBuilder;
39+
import org.asynchttpclient.Response;
40+
import org.asynchttpclient.filter.FilterContext;
41+
import org.asynchttpclient.filter.FilterException;
42+
import org.asynchttpclient.filter.IOExceptionFilter;
43+
import org.asynchttpclient.filter.ResponseFilter;
44+
import org.asynchttpclient.generators.InputStreamBodyGenerator;
45+
import org.asynchttpclient.listener.TransferCompletionHandler;
46+
import org.asynchttpclient.multipart.MultipartBody;
47+
import org.asynchttpclient.multipart.MultipartRequestEntity;
48+
import org.asynchttpclient.ntlm.NTLMEngine;
49+
import org.asynchttpclient.ntlm.NTLMEngineException;
50+
import org.asynchttpclient.org.jboss.netty.handler.codec.http.CookieDecoder;
51+
import org.asynchttpclient.org.jboss.netty.handler.codec.http.CookieEncoder;
52+
import org.asynchttpclient.providers.netty.FeedableBodyGenerator.FeedListener;
53+
import org.asynchttpclient.providers.netty.spnego.SpnegoEngine;
54+
import org.asynchttpclient.providers.netty.util.CleanupChannelGroup;
55+
import org.asynchttpclient.util.AsyncHttpProviderUtils;
56+
import org.asynchttpclient.util.AuthenticatorUtils;
57+
import org.asynchttpclient.util.ProxyUtils;
58+
import org.asynchttpclient.util.SslUtils;
59+
import org.asynchttpclient.util.UTF8UrlEncoder;
60+
import org.asynchttpclient.websocket.WebSocketUpgradeHandler;
5461
import org.jboss.netty.bootstrap.ClientBootstrap;
5562
import org.jboss.netty.buffer.ChannelBuffer;
5663
import org.jboss.netty.buffer.ChannelBufferOutputStream;
@@ -97,49 +104,39 @@
97104
import org.slf4j.Logger;
98105
import org.slf4j.LoggerFactory;
99106

100-
import org.asynchttpclient.AsyncHandler;
101-
import org.asynchttpclient.AsyncHandler.STATE;
102-
import org.asynchttpclient.AsyncHttpClientConfig;
103-
import org.asynchttpclient.AsyncHttpProvider;
104-
import org.asynchttpclient.Body;
105-
import org.asynchttpclient.BodyGenerator;
106-
import org.asynchttpclient.ConnectionPoolKeyStrategy;
107-
import org.asynchttpclient.ConnectionsPool;
108-
import org.asynchttpclient.Cookie;
109-
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
110-
import org.asynchttpclient.HttpResponseBodyPart;
111-
import org.asynchttpclient.HttpResponseHeaders;
112-
import org.asynchttpclient.HttpResponseStatus;
113-
import org.asynchttpclient.ListenableFuture;
114-
import org.asynchttpclient.MaxRedirectException;
115-
import org.asynchttpclient.ProgressAsyncHandler;
116-
import org.asynchttpclient.ProxyServer;
117-
import org.asynchttpclient.RandomAccessBody;
118-
import org.asynchttpclient.Realm;
119-
import org.asynchttpclient.Request;
120-
import org.asynchttpclient.RequestBuilder;
121-
import org.asynchttpclient.Response;
122-
import org.asynchttpclient.filter.FilterContext;
123-
import org.asynchttpclient.filter.FilterException;
124-
import org.asynchttpclient.filter.IOExceptionFilter;
125-
import org.asynchttpclient.filter.ResponseFilter;
126-
import org.asynchttpclient.generators.InputStreamBodyGenerator;
127-
import org.asynchttpclient.listener.TransferCompletionHandler;
128-
import org.asynchttpclient.ntlm.NTLMEngine;
129-
import org.asynchttpclient.ntlm.NTLMEngineException;
130-
import org.asynchttpclient.providers.netty.FeedableBodyGenerator.FeedListener;
131-
import org.asynchttpclient.providers.netty.spnego.SpnegoEngine;
132-
import org.asynchttpclient.providers.netty.util.CleanupChannelGroup;
133-
import org.asynchttpclient.websocket.WebSocketUpgradeHandler;
134-
import org.asynchttpclient.multipart.MultipartBody;
135-
import org.asynchttpclient.multipart.MultipartRequestEntity;
136-
import org.asynchttpclient.util.AsyncHttpProviderUtils;
137-
import org.asynchttpclient.util.AuthenticatorUtils;
138-
import org.asynchttpclient.util.ProxyUtils;
139-
import org.asynchttpclient.util.SslUtils;
140-
import org.asynchttpclient.util.UTF8UrlEncoder;
141-
import org.asynchttpclient.org.jboss.netty.handler.codec.http.CookieEncoder;
142-
import org.asynchttpclient.org.jboss.netty.handler.codec.http.CookieDecoder;
107+
import javax.net.ssl.SSLEngine;
108+
import java.io.File;
109+
import java.io.FileInputStream;
110+
import java.io.IOException;
111+
import java.io.RandomAccessFile;
112+
import java.net.ConnectException;
113+
import java.net.InetSocketAddress;
114+
import java.net.MalformedURLException;
115+
import java.net.URI;
116+
import java.nio.channels.ClosedChannelException;
117+
import java.nio.channels.FileChannel;
118+
import java.nio.channels.WritableByteChannel;
119+
import java.nio.charset.Charset;
120+
import java.security.GeneralSecurityException;
121+
import java.security.NoSuchAlgorithmException;
122+
import java.util.ArrayList;
123+
import java.util.List;
124+
import java.util.Map.Entry;
125+
import java.util.concurrent.Callable;
126+
import java.util.concurrent.ExecutionException;
127+
import java.util.concurrent.ExecutorService;
128+
import java.util.concurrent.Executors;
129+
import java.util.concurrent.Future;
130+
import java.util.concurrent.RejectedExecutionException;
131+
import java.util.concurrent.Semaphore;
132+
import java.util.concurrent.TimeUnit;
133+
import java.util.concurrent.TimeoutException;
134+
import java.util.concurrent.atomic.AtomicBoolean;
135+
136+
import static org.asynchttpclient.util.AsyncHttpProviderUtils.DEFAULT_CHARSET;
137+
import static org.asynchttpclient.util.DateUtil.millisTime;
138+
import static org.asynchttpclient.util.MiscUtil.isNonEmpty;
139+
import static org.jboss.netty.channel.Channels.pipeline;
143140

144141
public class NettyAsyncHttpProvider extends SimpleChannelUpstreamHandler implements AsyncHttpProvider {
145142
private final static String WEBSOCKET_KEY = "Sec-WebSocket-Key";
@@ -2205,12 +2202,23 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
22052202
}
22062203

22072204
private final class WebSocketProtocol implements Protocol {
2205+
private static final byte OPCODE_CONT = 0x0;
22082206
private static final byte OPCODE_TEXT = 0x1;
22092207
private static final byte OPCODE_BINARY = 0x2;
22102208
private static final byte OPCODE_UNKNOWN = -1;
2211-
22122209
protected byte pendingOpcode = OPCODE_UNKNOWN;
2213-
private final CountDownLatch onSuccessLatch = new CountDownLatch(1);
2210+
private final AtomicBoolean onSuccesInvoked = new AtomicBoolean();
2211+
2212+
// We don't need to synchronize as replacing the "ws-decoder" will process using the same thread.
2213+
private void invokeOnSucces(ChannelHandlerContext ctx, WebSocketUpgradeHandler h) {
2214+
if (!onSuccesInvoked.getAndSet(true)) {
2215+
try {
2216+
h.onSuccess(new NettyWebSocket(ctx.getChannel()));
2217+
} catch (Exception ex) {
2218+
NettyAsyncHttpProvider.this.log.warn("onSuccess unexexpected exception", ex);
2219+
}
2220+
}
2221+
}
22142222

22152223
// @Override
22162224
public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@@ -2263,7 +2271,8 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
22632271
s = new ResponseStatus(future.getURI(), response, NettyAsyncHttpProvider.this);
22642272
final boolean statusReceived = h.onStatusReceived(s) == STATE.UPGRADE;
22652273

2266-
if (!validStatus || !validUpgrade || !validConnection || !statusReceived) {
2274+
final boolean headeOK = h.onHeadersReceived(responseHeaders) == STATE.CONTINUE;
2275+
if (!headeOK || !validStatus || !validUpgrade || !validConnection || !statusReceived) {
22672276
abort(future, new IOException("Invalid handshake response"));
22682277
return;
22692278
}
@@ -2276,25 +2285,12 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
22762285

22772286
ctx.getPipeline().replace("http-encoder", "ws-encoder", new WebSocket08FrameEncoder(true));
22782287
ctx.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder", new WebSocket08FrameDecoder(false, false));
2279-
if (h.onHeadersReceived(responseHeaders) == STATE.CONTINUE) {
2280-
try {
2281-
h.onSuccess(new NettyWebSocket(ctx.getChannel()));
2282-
} catch (Exception ex) {
2283-
NettyAsyncHttpProvider.this.log.warn("onSuccess unexexpected exception", ex);
2284-
} finally {
2285-
/**
2286-
* A websocket message may always be included with the handshake response. As soon as we replace
2287-
* the ws-decoder, this class can be called and we are still inside the onSuccess processing
2288-
* causing invalid state.
2289-
*/
2290-
onSuccessLatch.countDown();
2291-
}
2292-
}
2288+
2289+
invokeOnSucces(ctx, h);
22932290
future.done(null);
22942291
} else if (e.getMessage() instanceof WebSocketFrame) {
22952292

2296-
// Give a chance to the onSuccess to complete before processing message.
2297-
onSuccessLatch.await();
2293+
invokeOnSucces(ctx, h);
22982294

22992295
final WebSocketFrame frame = (WebSocketFrame) e.getMessage();
23002296

@@ -2339,6 +2335,7 @@ public void setContent(ChannelBuffer content) {
23392335

23402336
if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) {
23412337
try {
2338+
ctx.setAttachment(DiscardEvent.class);
23422339
webSocket.onClose(CloseWebSocketFrame.class.cast(frame).getStatusCode(), CloseWebSocketFrame.class.cast(frame).getReasonText());
23432340
} catch (Throwable t) {
23442341
// Swallow any exception that may comes from a Netty version released before 3.4.0
@@ -2387,7 +2384,8 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
23872384
WebSocketUpgradeHandler h = WebSocketUpgradeHandler.class.cast(nettyResponse.getAsyncHandler());
23882385
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
23892386

2390-
webSocket.close(1006, "Connection was closed abnormally (that is, with no close frame being sent).");
2387+
if (ctx.getAttachment() == null || !DiscardEvent.class.isAssignableFrom(ctx.getAttachment().getClass()))
2388+
webSocket.close(1006, "Connection was closed abnormally (that is, with no close frame being sent).");
23912389
} catch (Throwable t) {
23922390
log.error("onError", t);
23932391
}

0 commit comments

Comments
 (0)