Skip to content

Commit df16283

Browse files
committed
Final fix for AsyncHttpClient#317: allow sending and receiving message in onSucces
1 parent c7c3ce8 commit df16283

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map.Entry;
4040
import java.util.concurrent.Callable;
41+
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.ExecutionException;
4243
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.Executors;
@@ -2209,10 +2210,11 @@ private final class WebSocketProtocol implements Protocol {
22092210
private static final byte OPCODE_UNKNOWN = -1;
22102211

22112212
protected byte pendingOpcode = OPCODE_UNKNOWN;
2213+
private final CountDownLatch onSuccessLatch = new CountDownLatch(1);
22122214

22132215
// @Override
22142216
public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
2215-
NettyResponseFuture<?> future = NettyResponseFuture.class.cast(ctx.getAttachment());
2217+
NettyResponseFuture future = NettyResponseFuture.class.cast(ctx.getAttachment());
22162218
WebSocketUpgradeHandler h = WebSocketUpgradeHandler.class.cast(future.getAsyncHandler());
22172219
Request request = future.getRequest();
22182220

@@ -2221,7 +2223,7 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
22212223

22222224
HttpResponseStatus s = new ResponseStatus(future.getURI(), response, NettyAsyncHttpProvider.this);
22232225
HttpResponseHeaders responseHeaders = new ResponseHeaders(future.getURI(), response, NettyAsyncHttpProvider.this);
2224-
FilterContext fc = new FilterContext.FilterContextBuilder().asyncHandler(h).request(request).responseStatus(s).responseHeaders(responseHeaders).build();
2226+
FilterContext<?> fc = new FilterContext.FilterContextBuilder().asyncHandler(h).request(request).responseStatus(s).responseHeaders(responseHeaders).build();
22252227
for (ResponseFilter asyncFilter : config.getResponseFilters()) {
22262228
try {
22272229
fc = asyncFilter.filter(fc);
@@ -2261,29 +2263,39 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
22612263
s = new ResponseStatus(future.getURI(), response, NettyAsyncHttpProvider.this);
22622264
final boolean statusReceived = h.onStatusReceived(s) == STATE.UPGRADE;
22632265

2264-
if (!statusReceived) {
2265-
h.onClose(new NettyWebSocket(ctx.getChannel()), 1002, "Bad response status " + response.getStatus().getCode());
2266-
future.done(null);
2266+
if (!validStatus || !validUpgrade || !validConnection || !statusReceived) {
2267+
abort(future, new IOException("Invalid handshake response"));
22672268
return;
22682269
}
22692270

2270-
if (!validStatus || !validUpgrade || !validConnection) {
2271-
throw new IOException("Invalid handshake response");
2272-
}
2273-
22742271
String accept = response.getHeader("Sec-WebSocket-Accept");
22752272
String key = WebSocketUtil.getAcceptKey(future.getNettyRequest().getHeader(WEBSOCKET_KEY));
22762273
if (accept == null || !accept.equals(key)) {
22772274
throw new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key));
22782275
}
22792276

2277+
ctx.getPipeline().replace("http-encoder", "ws-encoder", new WebSocket08FrameEncoder(true));
22802278
ctx.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder", new WebSocket08FrameDecoder(false, false));
22812279
if (h.onHeadersReceived(responseHeaders) == STATE.CONTINUE) {
2282-
h.onSuccess(new NettyWebSocket(ctx.getChannel()));
2280+
try {
2281+
h.onSuccess(new NettyWebSocket(ctx.getChannel()));
2282+
} catch (Exception ex) {
2283+
NettyAsyncHttpProvider.this.log.warn("onSuccess unexexpected exception", ex);
2284+
} finally {
2285+
/**
2286+
* A websocket message may always be included with the handshake response. As soon as we replace
2287+
* the ws-decoder, this class can be called and we are still inside the onSuccess processing
2288+
* causing invalid state.
2289+
*/
2290+
onSuccessLatch.countDown();
2291+
}
22832292
}
2284-
ctx.getPipeline().replace("http-encoder", "ws-encoder", new WebSocket08FrameEncoder(true));
22852293
future.done(null);
22862294
} else if (e.getMessage() instanceof WebSocketFrame) {
2295+
2296+
// Give a chance to the onSuccess to complete before processing message.
2297+
onSuccessLatch.await();
2298+
22872299
final WebSocketFrame frame = (WebSocketFrame) e.getMessage();
22882300

22892301
if (frame instanceof TextWebSocketFrame) {

0 commit comments

Comments
 (0)