Skip to content

Commit 7e686d9

Browse files
committed
Refactor code.
1 parent d23bbf7 commit 7e686d9

File tree

3 files changed

+29
-35
lines changed

3 files changed

+29
-35
lines changed

Source/MQTTnet/Client/MqttClient.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,7 @@ public event Func<InspectMqttPacketEventArgs, Task> InspectPackage
9191
add => _inspectPacketEvent.AddHandler(value);
9292
remove => _inspectPacketEvent.RemoveHandler(value);
9393
}
94-
95-
[Obsolete(
96-
"This property will be removed in the future. Checking for a working connection should be done via calling _PinAsync_. Also other successful traffic can be used to indicate connection status.")]
94+
9795
public bool IsConnected => (MqttClientConnectionStatus)_connectionStatus == MqttClientConnectionStatus.Connected;
9896

9997
public MqttClientOptions Options { get; private set; }

Source/MQTTnet/Internal/AsyncLock.cs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ namespace MQTTnet.Internal
1010
{
1111
public sealed class AsyncLock : IDisposable
1212
{
13-
readonly object _syncRoot = new object();
1413
readonly Task<IDisposable> _releaser;
14+
readonly object _syncRoot = new object();
1515

1616
SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
1717

@@ -20,6 +20,15 @@ public AsyncLock()
2020
_releaser = Task.FromResult((IDisposable)new Releaser(this));
2121
}
2222

23+
public void Dispose()
24+
{
25+
lock (_syncRoot)
26+
{
27+
_semaphore?.Dispose();
28+
_semaphore = null;
29+
}
30+
}
31+
2332
public Task<IDisposable> WaitAsync(CancellationToken cancellationToken)
2433
{
2534
Task task;
@@ -45,22 +54,10 @@ public Task<IDisposable> WaitAsync(CancellationToken cancellationToken)
4554
}
4655

4756
// Wait for the _WaitAsync_ method and return the releaser afterwards.
48-
return task.ContinueWith(
49-
(_, state) => (IDisposable)state,
50-
_releaser.Result,
51-
cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
52-
}
53-
54-
public void Dispose()
55-
{
56-
lock (_syncRoot)
57-
{
58-
_semaphore?.Dispose();
59-
_semaphore = null;
60-
}
57+
return task.ContinueWith((_, state) => (IDisposable)state, _releaser.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
6158
}
6259

63-
internal void Release()
60+
void Release()
6461
{
6562
lock (_syncRoot)
6663
{
@@ -83,4 +80,4 @@ public void Dispose()
8380
}
8481
}
8582
}
86-
}
83+
}

Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification
2323
{
2424
readonly Dictionary<string, MqttClient> _clients = new Dictionary<string, MqttClient>(4096);
2525

26+
readonly AsyncLock _createConnectionSyncRoot = new AsyncLock();
27+
2628
readonly MqttServerEventContainer _eventContainer;
2729
readonly MqttNetSourceLogger _logger;
2830
readonly MqttServerOptions _options;
@@ -38,8 +40,6 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification
3840
readonly object _sessionsManagementLock = new object();
3941
readonly HashSet<MqttSession> _subscriberSessions = new HashSet<MqttSession>();
4042

41-
readonly SemaphoreSlim _createConnectionSyncRoot = new SemaphoreSlim(1, 1);
42-
4343
public MqttClientSessionsManager(
4444
MqttServerOptions options,
4545
MqttRetainedMessagesManager retainedMessagesManager,
@@ -77,7 +77,7 @@ public async Task CloseAllConnectionsAsync()
7777
public async Task DeleteSessionAsync(string clientId)
7878
{
7979
_logger.Verbose("Deleting session for client '{0}'.", clientId);
80-
80+
8181
MqttClient connection;
8282

8383
lock (_clients)
@@ -441,6 +441,11 @@ public Task UnsubscribeAsync(string clientId, ICollection<string> topicFilters)
441441
return GetClientSession(clientId).SubscriptionsManager.Unsubscribe(fakeUnsubscribePacket, CancellationToken.None);
442442
}
443443

444+
MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttSession session)
445+
{
446+
return new MqttClient(connectPacket, channelAdapter, session, _options, _eventContainer, this, _rootLogger);
447+
}
448+
444449
async Task<MqttClient> CreateClientConnection(
445450
MqttConnectPacket connectPacket,
446451
MqttConnAckPacket connAckPacket,
@@ -473,9 +478,8 @@ async Task<MqttClient> CreateClientConnection(
473478

474479
sessionShouldPersist = !connectPacket.CleanSession;
475480
}
476-
477-
await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false);
478-
try
481+
482+
using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false))
479483
{
480484
MqttSession session;
481485
lock (_sessionsManagementLock)
@@ -528,24 +532,19 @@ async Task<MqttClient> CreateClientConnection(
528532

529533
if (_eventContainer.ClientConnectedEvent.HasHandlers)
530534
{
531-
var eventArgs = new ClientDisconnectedEventArgs(existingClient.Id, MqttClientDisconnectType.Takeover, existingClient.Endpoint, existingClient.Session.Items);
535+
var eventArgs = new ClientDisconnectedEventArgs(
536+
existingClient.Id,
537+
MqttClientDisconnectType.Takeover,
538+
existingClient.Endpoint,
539+
existingClient.Session.Items);
532540
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
533541
}
534542
}
535543
}
536-
finally
537-
{
538-
_createConnectionSyncRoot.Release();
539-
}
540544

541545
return client;
542546
}
543547

544-
MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttSession session)
545-
{
546-
return new MqttClient(connectPacket, channelAdapter, session, _options, _eventContainer, this, _rootLogger);
547-
}
548-
549548
MqttSession CreateSession(string clientId, IDictionary sessionItems, bool isPersistent)
550549
{
551550
_logger.Verbose("Created new session for client '{0}'.", clientId);

0 commit comments

Comments
 (0)