Skip to content

Commit cb25225

Browse files
author
Stephane Landelle
committed
Port dcb6156 on master, close AsyncHttpClient#301
1 parent b64a2dc commit cb25225

File tree

1 file changed

+62
-40
lines changed

1 file changed

+62
-40
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/ws/NettyWebSocket.java

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public class NettyWebSocket implements WebSocket {
3737
private final Channel channel;
3838
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
3939

40-
private StringBuilder textBuffer;
41-
private ByteArrayOutputStream byteBuffer;
40+
private final StringBuilder textBuffer = new StringBuilder();
41+
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
4242
private int maxBufferSize = 128000000;
4343

4444
public NettyWebSocket(Channel channel) {
@@ -136,65 +136,87 @@ public void close(int statusCode, String reason) {
136136
}
137137

138138
public void onBinaryFragment(byte[] message, boolean last) {
139-
for (WebSocketListener l : listeners) {
140-
if (l instanceof WebSocketByteListener) {
141-
try {
142-
WebSocketByteListener.class.cast(l).onFragment(message, last);
143139

144-
if (byteBuffer == null) {
145-
byteBuffer = new ByteArrayOutputStream();
146-
}
147-
148-
byteBuffer.write(message);
140+
if (!last) {
141+
try {
142+
byteBuffer.write(message);
143+
} catch (Exception ex) {
144+
byteBuffer.reset();
145+
onError(ex);
146+
return;
147+
}
149148

150-
if (byteBuffer.size() > maxBufferSize) {
151-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
152-
l.onError(e);
153-
this.close();
154-
return;
155-
}
149+
if (byteBuffer.size() > maxBufferSize) {
150+
byteBuffer.reset();
151+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
152+
onError(e);
153+
this.close();
154+
return;
155+
}
156+
}
156157

157-
if (last) {
158-
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
159-
byteBuffer = null;
160-
textBuffer = null;
158+
for (WebSocketListener l : listeners) {
159+
if (l instanceof WebSocketByteListener) {
160+
try {
161+
if (!last) {
162+
WebSocketByteListener.class.cast(l).onFragment(message, last);
163+
} else {
164+
if (byteBuffer.size() > 0) {
165+
byteBuffer.write(message);
166+
WebSocketByteListener.class.cast(l).onFragment(message, last);
167+
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
168+
} else {
169+
WebSocketByteListener.class.cast(l).onMessage(message);
170+
}
161171
}
162172
} catch (Exception ex) {
163173
l.onError(ex);
164174
}
165175
}
166176
}
177+
178+
if (last) {
179+
byteBuffer.reset();
180+
}
167181
}
168182

169183
public void onTextFragment(String message, boolean last) {
170-
for (WebSocketListener l : listeners) {
171-
if (l instanceof WebSocketTextListener) {
172-
try {
173-
WebSocketTextListener.class.cast(l).onFragment(message, last);
174184

175-
if (textBuffer == null) {
176-
textBuffer = new StringBuilder();
177-
}
178-
179-
textBuffer.append(message);
185+
if (!last) {
186+
textBuffer.append(message);
180187

181-
if (textBuffer.length() > maxBufferSize) {
182-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
183-
l.onError(e);
184-
this.close();
185-
return;
186-
}
188+
if (textBuffer.length() > maxBufferSize) {
189+
textBuffer.setLength(0);
190+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
191+
onError(e);
192+
this.close();
193+
return;
194+
}
195+
}
187196

188-
if (last) {
189-
WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString());
190-
byteBuffer = null;
191-
textBuffer = null;
197+
for (WebSocketListener l : listeners) {
198+
if (l instanceof WebSocketTextListener) {
199+
try {
200+
if (!last) {
201+
WebSocketTextListener.class.cast(l).onFragment(message, last);
202+
} else {
203+
if (textBuffer.length() > 0) {
204+
WebSocketTextListener.class.cast(l).onFragment(message, last);
205+
206+
WebSocketTextListener.class.cast(l).onMessage(textBuffer.append(message).toString());
207+
} else {
208+
WebSocketTextListener.class.cast(l).onMessage(message);
209+
}
192210
}
193211
} catch (Exception ex) {
194212
l.onError(ex);
195213
}
196214
}
197215
}
216+
217+
if (last) {
218+
textBuffer.setLength(0);
219+
}
198220
}
199221

200222
public void onError(Throwable t) {

0 commit comments

Comments
 (0)