@@ -121,19 +121,19 @@ public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions option
121121 var mqttClientAliveToken = _mqttClientAlive . Token ;
122122
123123 var adapter = _adapterFactory . CreateClientAdapter ( options , new MqttPacketInspector ( _events . InspectPacketEvent , _rootLogger ) , _rootLogger ) ;
124- _adapter = adapter ;
124+ _adapter = adapter ?? throw new InvalidOperationException ( "The adapter factory did not provide an adapter." ) ;
125125
126126 if ( cancellationToken . CanBeCanceled )
127127 {
128- connectResult = await ConnectInternal ( cancellationToken ) . ConfigureAwait ( false ) ;
128+ connectResult = await ConnectInternal ( adapter , cancellationToken ) . ConfigureAwait ( false ) ;
129129 }
130130 else
131131 {
132132 // Fall back to the general timeout specified in the options if the user passed
133133 // CancellationToken.None or similar.
134134 using ( var timeout = new CancellationTokenSource ( Options . Timeout ) )
135135 {
136- connectResult = await ConnectInternal ( timeout . Token ) . ConfigureAwait ( false ) ;
136+ connectResult = await ConnectInternal ( adapter , timeout . Token ) . ConfigureAwait ( false ) ;
137137 }
138138 }
139139
@@ -421,7 +421,7 @@ Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs ev
421421 return CompletedTask . Instance ;
422422 }
423423
424- async Task < MqttClientConnectResult > Authenticate ( MqttClientOptions options , CancellationToken cancellationToken )
424+ async Task < MqttClientConnectResult > Authenticate ( IMqttChannelAdapter channelAdapter , MqttClientOptions options , CancellationToken cancellationToken )
425425 {
426426 MqttClientConnectResult result ;
427427
@@ -435,7 +435,7 @@ async Task<MqttClientConnectResult> Authenticate(MqttClientOptions options, Canc
435435 if ( receivedPacket is MqttConnAckPacket connAckPacket )
436436 {
437437 var clientConnectResultFactory = new MqttClientConnectResultFactory ( ) ;
438- result = clientConnectResultFactory . Create ( connAckPacket , _adapter . PacketFormatterAdapter . ProtocolVersion ) ;
438+ result = clientConnectResultFactory . Create ( connAckPacket , channelAdapter . PacketFormatterAdapter . ProtocolVersion ) ;
439439 }
440440 else
441441 {
@@ -484,7 +484,7 @@ MqttClientConnectionStatus CompareExchangeConnectionStatus(MqttClientConnectionS
484484 return ( MqttClientConnectionStatus ) Interlocked . CompareExchange ( ref _connectionStatus , ( int ) value , ( int ) comparand ) ;
485485 }
486486
487- async Task < MqttClientConnectResult > ConnectInternal ( CancellationToken cancellationToken )
487+ async Task < MqttClientConnectResult > ConnectInternal ( IMqttChannelAdapter channelAdapter , CancellationToken cancellationToken )
488488 {
489489 var backgroundCancellationToken = _mqttClientAlive . Token ;
490490
@@ -497,7 +497,7 @@ async Task<MqttClientConnectResult> ConnectInternal(CancellationToken cancellati
497497 _publishPacketReceiverQueue ? . Dispose ( ) ;
498498 _publishPacketReceiverQueue = new AsyncQueue < MqttPublishPacket > ( ) ;
499499
500- var connectResult = await Authenticate ( Options , effectiveCancellationToken . Token ) . ConfigureAwait ( false ) ;
500+ var connectResult = await Authenticate ( channelAdapter , Options , effectiveCancellationToken . Token ) . ConfigureAwait ( false ) ;
501501
502502 _publishPacketReceiverTask = Task . Run ( ( ) => ProcessReceivedPublishPackets ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
503503 _packetReceiverTask = Task . Run ( ( ) => ReceivePacketsLoop ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
0 commit comments