Skip to content

Commit 5ee0bb4

Browse files
authored
1565 high cpu load from asyncsignal (#1566)
1 parent 5c74728 commit 5ee0bb4

File tree

6 files changed

+67
-20
lines changed

6 files changed

+67
-20
lines changed

Source/MQTTnet.Tests/Internal/AsyncSignal_Tests.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,26 @@ public async Task Dispose_Properly()
5252
}
5353

5454
[TestMethod]
55-
public async Task Loop_Signal()
55+
public async Task Reset_Signal()
5656
{
5757
var asyncSignal = new AsyncSignal();
5858

59+
// WaitAsync should fail because no signal is available.
5960
for (var i = 0; i < 10; i++)
6061
{
62+
try
63+
{
64+
using (var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
65+
{
66+
await asyncSignal.WaitAsync(timeout.Token);
67+
}
68+
69+
Assert.Fail("This must fail because the signal is not yet set.");
70+
}
71+
catch (OperationCanceledException)
72+
{
73+
}
74+
6175
asyncSignal.Set();
6276

6377
// WaitAsync should return directly because the signal is available.

Source/MQTTnet.Tests/Server/Load_Tests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ await client.PublishAsync(message)
5555
});
5656
}
5757

58-
SpinWait.SpinUntil(() => receivedMessages == 100000, TimeSpan.FromSeconds(90));
58+
SpinWait.SpinUntil(() => receivedMessages == 100000, TimeSpan.FromSeconds(120));
5959

6060
Assert.AreEqual(100000, receivedMessages);
6161
}

Source/MQTTnet.Tests/Server/Session_Tests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ void OnReceive()
150150
// Try to connect 50 clients at the same time.
151151
var clients = await Task.WhenAll(Enumerable.Range(0, 50).Select(i => ConnectAndSubscribe(testEnvironment, options, OnReceive)));
152152

153-
await Task.Delay(5000);
153+
await Task.Delay(TimeSpan.FromSeconds(10));
154154

155155
var connectedClients = clients.Where(c => c?.TryPingAsync().GetAwaiter().GetResult() == true).ToList();
156156

@@ -162,7 +162,7 @@ void OnReceive()
162162
var sendClient = await testEnvironment.ConnectClient(option2);
163163
await sendClient.PublishStringAsync("aaa", "1");
164164

165-
await Task.Delay(3000);
165+
await Task.Delay(TimeSpan.FromSeconds(5));
166166

167167
Assert.AreEqual(true, hasReceive);
168168
}

Source/MQTTnet/Internal/AsyncQueue.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void Dispose()
2929
{
3030
lock (_syncRoot)
3131
{
32-
_signal?.Dispose();
32+
_signal.Dispose();
3333

3434
_isDisposed = true;
3535
}
@@ -48,10 +48,10 @@ public AsyncQueueDequeueResult<TItem> TryDequeue()
4848
{
4949
if (_queue.TryDequeue(out var item))
5050
{
51-
return new AsyncQueueDequeueResult<TItem>(true, item);
51+
return AsyncQueueDequeueResult<TItem>.Success(item);
5252
}
5353

54-
return new AsyncQueueDequeueResult<TItem>(false, default);
54+
return AsyncQueueDequeueResult<TItem>.NonSuccess;
5555
}
5656

5757
public async Task<AsyncQueueDequeueResult<TItem>> TryDequeueAsync(CancellationToken cancellationToken)
@@ -60,36 +60,42 @@ public async Task<AsyncQueueDequeueResult<TItem>> TryDequeueAsync(CancellationTo
6060
{
6161
try
6262
{
63-
Task task;
63+
Task task = null;
6464
lock (_syncRoot)
6565
{
6666
if (_isDisposed)
6767
{
68-
return new AsyncQueueDequeueResult<TItem>(false, default);
68+
return AsyncQueueDequeueResult<TItem>.NonSuccess;
6969
}
7070

71-
task = _signal.WaitAsync(cancellationToken);
71+
if (_queue.IsEmpty)
72+
{
73+
task = _signal.WaitAsync(cancellationToken);
74+
}
7275
}
7376

74-
await task.ConfigureAwait(false);
75-
77+
if (task != null)
78+
{
79+
await task.ConfigureAwait(false);
80+
}
81+
7682
if (cancellationToken.IsCancellationRequested)
7783
{
78-
return new AsyncQueueDequeueResult<TItem>(false, default);
84+
return AsyncQueueDequeueResult<TItem>.NonSuccess;
7985
}
8086

8187
if (_queue.TryDequeue(out var item))
8288
{
83-
return new AsyncQueueDequeueResult<TItem>(true, item);
89+
return AsyncQueueDequeueResult<TItem>.Success(item);
8490
}
8591
}
8692
catch (OperationCanceledException)
8793
{
88-
return new AsyncQueueDequeueResult<TItem>(false, default);
94+
return AsyncQueueDequeueResult<TItem>.NonSuccess;
8995
}
9096
}
9197

92-
return new AsyncQueueDequeueResult<TItem>(false, default);
98+
return AsyncQueueDequeueResult<TItem>.NonSuccess;
9399
}
94100
}
95101
}

Source/MQTTnet/Internal/AsyncQueueDequeueResult.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
namespace MQTTnet.Internal
66
{
7-
public class AsyncQueueDequeueResult<TItem>
7+
public sealed class AsyncQueueDequeueResult<TItem>
88
{
9+
public static readonly AsyncQueueDequeueResult<TItem> NonSuccess = new AsyncQueueDequeueResult<TItem>(false, default);
10+
911
public AsyncQueueDequeueResult(bool isSuccess, TItem item)
1012
{
1113
IsSuccess = isSuccess;
@@ -15,5 +17,10 @@ public AsyncQueueDequeueResult(bool isSuccess, TItem item)
1517
public bool IsSuccess { get; }
1618

1719
public TItem Item { get; }
20+
21+
public static AsyncQueueDequeueResult<TItem> Success(TItem item)
22+
{
23+
return new AsyncQueueDequeueResult<TItem>(true, item);
24+
}
1825
}
1926
}

Source/MQTTnet/Internal/AsyncSignal.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,17 @@ public void Set()
3333
{
3434
_isSignaled = true;
3535

36+
Cleanup();
37+
3638
// If there is already a waiting task let it run.
37-
_waiter?.Approve();
38-
_waiter = null;
39+
if (_waiter != null)
40+
{
41+
_waiter.Approve();
42+
_waiter = null;
43+
44+
// Since we already got a waiter the signal must be reset right now!
45+
_isSignaled = false;
46+
}
3947
}
4048
}
4149

@@ -47,8 +55,11 @@ public Task WaitAsync(CancellationToken cancellationToken = default)
4755
{
4856
ThrowIfDisposed();
4957

58+
Cleanup();
59+
5060
if (_isSignaled)
5161
{
62+
_isSignaled = false;
5263
return CompletedTask.Instance;
5364
}
5465

@@ -58,7 +69,7 @@ public Task WaitAsync(CancellationToken cancellationToken = default)
5869
{
5970
throw new InvalidOperationException("Only one waiting task is permitted per async signal.");
6071
}
61-
72+
6273
_waiter.Dispose();
6374
}
6475

@@ -67,6 +78,15 @@ public Task WaitAsync(CancellationToken cancellationToken = default)
6778
}
6879
}
6980

81+
void Cleanup()
82+
{
83+
// Cleanup if the previous waiter was cancelled.
84+
if (_waiter != null && _waiter.Task.IsCanceled)
85+
{
86+
_waiter = null;
87+
}
88+
}
89+
7090
void ThrowIfDisposed()
7191
{
7292
if (_isDisposed)

0 commit comments

Comments
 (0)