Skip to content

Commit 5b1cc7d

Browse files
authored
Also cleanup MqttClientSessionsManager._subscriberSessions when session is taken over by a new one with CleanSession=true, fix #1552 (#1553)
1 parent 2842f27 commit 5b1cc7d

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

Source/MQTTnet.Tests/Internal/AsyncSignal_Tests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public async Task Signal()
9090
Assert.IsTrue(stopwatch.ElapsedMilliseconds > 900);
9191
}
9292
}
93-
93+
9494
[TestMethod]
9595
[ExpectedException(typeof(InvalidOperationException))]
9696
public async Task Fail_For_Two_Waiters()

Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,9 +485,10 @@ async Task<MqttClient> CreateClientConnection(
485485
using (await _createConnectionSyncRoot.EnterAsync().ConfigureAwait(false))
486486
{
487487
MqttSession session;
488+
MqttSession oldSession;
488489
lock (_sessionsManagementLock)
489490
{
490-
if (!_sessions.TryGetValue(connectPacket.ClientId, out session))
491+
if (!_sessions.TryGetValue(connectPacket.ClientId, out oldSession))
491492
{
492493
session = CreateSession(connectPacket.ClientId, validatingConnectionEventArgs.SessionItems, sessionShouldPersist);
493494
}
@@ -496,11 +497,14 @@ async Task<MqttClient> CreateClientConnection(
496497
if (connectPacket.CleanSession)
497498
{
498499
_logger.Verbose("Deleting existing session of client '{0}' due to clean start.", connectPacket.ClientId);
500+
_subscriberSessions.Remove(oldSession);
499501
session = CreateSession(connectPacket.ClientId, validatingConnectionEventArgs.SessionItems, sessionShouldPersist);
500502
}
501503
else
502504
{
503505
_logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId);
506+
session = oldSession;
507+
oldSession = null;
504508
// Session persistence could change for MQTT 5 clients that reconnect with different SessionExpiryInterval
505509
session.IsPersistent = sessionShouldPersist;
506510
connAckPacket.IsSessionPresent = true;
@@ -533,7 +537,7 @@ async Task<MqttClient> CreateClientConnection(
533537
existingClient.IsTakenOver = true;
534538
await existingClient.StopAsync(MqttDisconnectReasonCode.SessionTakenOver).ConfigureAwait(false);
535539

536-
if (_eventContainer.ClientConnectedEvent.HasHandlers)
540+
if (_eventContainer.ClientDisconnectedEvent.HasHandlers)
537541
{
538542
var eventArgs = new ClientDisconnectedEventArgs(
539543
existingClient.Id,
@@ -544,6 +548,8 @@ async Task<MqttClient> CreateClientConnection(
544548
await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
545549
}
546550
}
551+
552+
oldSession?.Dispose();
547553
}
548554

549555
return client;

0 commit comments

Comments
 (0)