Skip to content

Commit fd5fc9a

Browse files
committed
Make WebSocketFrame reads fully async
Make WebSocketFrame reads fully async. Process close requests via the message queue, so that anything received prior to the close frame is handled
1 parent f852a19 commit fd5fc9a

File tree

4 files changed

+244
-172
lines changed

4 files changed

+244
-172
lines changed

websocket-sharp/Ext.cs

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -618,37 +618,52 @@ internal static byte[] ReadBytes (this Stream stream, long length, int bufferLen
618618
}
619619
}
620620

621-
internal static void ReadBytesAsync (
621+
internal static void ReadBytesAsync(
622622
this Stream stream, int length, Action<byte[]> completed, Action<Exception> error)
623623
{
624624
var buff = new byte[length];
625-
stream.BeginRead (
626-
buff,
627-
0,
628-
length,
629-
ar => {
630-
try {
631-
byte[] bytes = null;
632-
try {
633-
var len = stream.EndRead (ar);
634-
bytes = len < 1
635-
? EmptyByteArray
636-
: len < length
637-
? stream.readBytes (buff, len, length - len)
638-
: buff;
639-
}
640-
catch {
641-
bytes = EmptyByteArray;
642-
}
643-
644-
if (completed != null)
645-
completed (bytes);
625+
ReadBytesAsync(stream, buff, 0, completed, error);
626+
}
627+
628+
internal static void ReadBytesAsync(
629+
this Stream stream, byte[] buff, int currentOffset, Action<byte[]> completed, Action<Exception> error)
630+
{
631+
AsyncCallback readHandler = (ar) =>
632+
{
633+
try
634+
{
635+
var readLength = stream.EndRead(ar);
636+
if (readLength == 0)
637+
{
638+
// EOF/disconnect before reading full length
639+
completed(EmptyByteArray);
640+
return;
646641
}
647-
catch (Exception ex) {
648-
if (error != null)
649-
error (ex);
642+
643+
currentOffset += readLength;
644+
if (currentOffset == buff.Length)
645+
{
646+
// we read everything we needed to read
647+
completed(buff);
650648
}
651-
},
649+
else
650+
{
651+
// need to read more
652+
ReadBytesAsync(stream, buff, currentOffset, completed, error);
653+
}
654+
}
655+
catch (Exception ex)
656+
{
657+
if (error != null)
658+
error(ex);
659+
}
660+
};
661+
662+
stream.BeginRead(
663+
buff,
664+
currentOffset,
665+
buff.Length - currentOffset,
666+
readHandler,
652667
null);
653668
}
654669

websocket-sharp/MessageEventArgs.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class MessageEventArgs : EventArgs
5252
private string _data;
5353
private bool _dataSet;
5454
private Opcode _opcode;
55+
private PayloadData _payloadData;
5556
private byte[] _rawData;
5657

5758
#endregion
@@ -61,6 +62,7 @@ public class MessageEventArgs : EventArgs
6162
internal MessageEventArgs (WebSocketFrame frame)
6263
{
6364
_opcode = frame.Opcode;
65+
_payloadData = frame.PayloadData;
6466
_rawData = frame.PayloadData.ApplicationData;
6567
}
6668

@@ -110,6 +112,14 @@ public byte[] RawData {
110112
}
111113
}
112114

115+
internal PayloadData PayloadData
116+
{
117+
get
118+
{
119+
return _payloadData ?? (_payloadData = new PayloadData(_rawData));
120+
}
121+
}
122+
113123
/// <summary>
114124
/// Gets the type of the message.
115125
/// </summary>

websocket-sharp/WebSocket.cs

Lines changed: 67 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,8 +1001,7 @@ private void open ()
10011001

10021002
private bool processCloseFrame (WebSocketFrame frame)
10031003
{
1004-
var payload = frame.PayloadData;
1005-
close (new CloseEventArgs (payload), !payload.IncludesReservedCloseStatusCode, false);
1004+
enqueueToMessageEventQueue(new MessageEventArgs(frame));
10061005

10071006
return false;
10081007
}
@@ -1461,43 +1460,80 @@ private void setClientStream ()
14611460
}
14621461
}
14631462

1464-
private void startReceiving ()
1463+
private void startReceiving()
14651464
{
14661465
if (_messageEventQueue.Count > 0)
1467-
_messageEventQueue.Clear ();
1466+
_messageEventQueue.Clear();
14681467

1469-
_exitReceiving = new AutoResetEvent (false);
1470-
_receivePong = new AutoResetEvent (false);
1468+
_exitReceiving = new AutoResetEvent(false);
1469+
_receivePong = new AutoResetEvent(false);
14711470

14721471
Action receive = null;
1473-
receive = () => WebSocketFrame.ReadAsync (
1474-
_stream,
1475-
false,
1476-
frame => {
1477-
if (processReceivedFrame (frame) && _readyState != WebSocketState.Closed) {
1478-
receive ();
1479-
1480-
if ((frame.IsControl && !(frame.IsPing && _emitOnPing)) || !frame.IsFinal)
1481-
return;
14821472

1483-
lock (_forEvent) {
1484-
try {
1485-
var e = dequeueFromMessageEventQueue ();
1486-
if (e != null && _readyState == WebSocketState.Open)
1487-
OnMessage.Emit (this, e);
1488-
}
1489-
catch (Exception ex) {
1490-
processException (ex, "An exception has occurred during an OnMessage event.");
1491-
}
1492-
}
1493-
}
1494-
else if (_exitReceiving != null) {
1495-
_exitReceiving.Set ();
1473+
Action<WebSocketFrame> messageHandler = (frame) =>
1474+
{
1475+
if (processReceivedFrame(frame) && _readyState != WebSocketState.Closed)
1476+
{
1477+
receive();
1478+
1479+
if ((frame.IsControl && !(frame.IsPing && _emitOnPing)) || !frame.IsFinal)
1480+
return;
1481+
1482+
handleOneMessage();
1483+
}
1484+
else
1485+
{
1486+
while (_messageEventQueue.Count > 0)
1487+
{
1488+
handleOneMessage();
14961489
}
1497-
},
1498-
ex => processException (ex, "An exception has occurred while receiving a message."));
14991490

1500-
receive ();
1491+
if (_exitReceiving != null)
1492+
{
1493+
_exitReceiving.Set();
1494+
}
1495+
}
1496+
};
1497+
1498+
Action<Exception> errorHandler = (ex) =>
1499+
{
1500+
processException(ex, "An exception has occurred while receiving a message.");
1501+
};
1502+
1503+
receive = () => WebSocketFrame.Read(
1504+
_stream,
1505+
false,
1506+
messageHandler,
1507+
errorHandler);
1508+
1509+
receive();
1510+
}
1511+
1512+
private void handleOneMessage()
1513+
{
1514+
lock (_forEvent)
1515+
{
1516+
try
1517+
{
1518+
MessageEventArgs e = dequeueFromMessageEventQueue();
1519+
if (e != null && _readyState == WebSocketState.Open)
1520+
{
1521+
if (e.Type == Opcode.Close)
1522+
{
1523+
var payload = e.PayloadData;
1524+
close(new CloseEventArgs(payload), !payload.IncludesReservedCloseStatusCode, false);
1525+
}
1526+
else
1527+
{
1528+
OnMessage.Emit(this, e);
1529+
}
1530+
}
1531+
}
1532+
catch (Exception ex)
1533+
{
1534+
processException(ex, "An exception has occurred during an OnMessage event.");
1535+
}
1536+
}
15011537
}
15021538

15031539
// As client

0 commit comments

Comments
 (0)