21
21
using System . Globalization ;
22
22
using System . Net . Http ;
23
23
using System . Threading ;
24
- using System . Threading . Channels ;
25
24
using System . Threading . Tasks ;
26
25
using Newtonsoft . Json ;
27
26
using Newtonsoft . Json . Linq ;
@@ -46,14 +45,12 @@ public class DevToolsSession : IDevToolsSession
46
45
47
46
private WebSocketConnection connection ;
48
47
private ConcurrentDictionary < long , DevToolsCommandData > pendingCommands = new ConcurrentDictionary < long , DevToolsCommandData > ( ) ;
49
- private readonly Channel < string > messageQueue ;
48
+ private readonly BlockingCollection < string > messageQueue = new BlockingCollection < string > ( ) ;
50
49
private readonly Task messageQueueMonitorTask ;
51
50
private long currentCommandId = 0 ;
52
51
53
52
private DevToolsDomains domains ;
54
53
55
- private CancellationTokenSource receiveCancellationToken ;
56
-
57
54
/// <summary>
58
55
/// Initializes a new instance of the DevToolsSession class, using the specified WebSocket endpoint.
59
56
/// </summary>
@@ -71,11 +68,6 @@ public DevToolsSession(string endpointAddress)
71
68
{
72
69
this . websocketAddress = endpointAddress ;
73
70
}
74
- this . messageQueue = Channel . CreateUnbounded < string > ( new UnboundedChannelOptions ( )
75
- {
76
- SingleReader = true ,
77
- SingleWriter = true ,
78
- } ) ;
79
71
this . messageQueueMonitorTask = Task . Run ( ( ) => this . MonitorMessageQueue ( ) ) ;
80
72
this . messageQueueMonitorTask . ConfigureAwait ( false ) ;
81
73
}
@@ -166,7 +158,8 @@ public T GetVersionSpecificDomains<T>() where T : DevToolsSessionDomains
166
158
/// </summary>
167
159
/// <typeparam name="TCommandResponse"></typeparam>
168
160
/// <typeparam name="TCommand">A command object implementing the <see cref="ICommand"/> interface.</typeparam>
169
- /// <param name="cancellationToken">A CancellationToken object to allow for cancellation of the command.</param>
161
+ /// <param name="command">A CancellationToken object to allow for cancellation of the command.</param>
162
+ /// <param name="cancellationToken">The command to be sent.</param>
170
163
/// <param name="millisecondsTimeout">The execution timeout of the command in milliseconds.</param>
171
164
/// <param name="throwExceptionIfResponseNotReceived"><see langword="true"/> to throw an exception if a response is not received; otherwise, <see langword="false"/>.</param>
172
165
/// <returns>The command response object implementing the <see cref="ICommandResponse{T}"/> interface.</returns>
@@ -295,7 +288,7 @@ internal async Task StartSession(int requestedProtocolVersion)
295
288
/// Asynchronously stops the session.
296
289
/// </summary>
297
290
/// <param name="manualDetach"><see langword="true"/> to manually detach the session
298
- /// from its attached target; otherswise <see langword="false"" />.</param>
291
+ /// from its attached target; otherswise <see langword="false"/>.</param>
299
292
/// <returns>A task that represents the asynchronous operation.</returns>
300
293
internal async Task StopSession ( bool manualDetach )
301
294
{
@@ -413,38 +406,60 @@ private void OnTargetDetached(object sender, TargetDetachedEventArgs e)
413
406
private async Task InitializeSocketConnection ( )
414
407
{
415
408
LogTrace ( "Creating WebSocket" ) ;
416
- this . connection = new WebSocketConnection ( ) ;
409
+ this . connection = new WebSocketConnection ( this . openConnectionWaitTimeSpan , this . closeConnectionWaitTimeSpan ) ;
417
410
connection . DataReceived += OnConnectionDataReceived ;
418
411
await connection . Start ( this . websocketAddress ) ;
419
- LogTrace ( "WebSocket created; starting message listener " ) ;
412
+ LogTrace ( "WebSocket created" ) ;
420
413
}
421
414
422
415
private async Task TerminateSocketConnection ( )
423
416
{
417
+ LogTrace ( "Closing WebSocket" ) ;
424
418
if ( this . connection != null && this . connection . IsActive )
425
419
{
426
420
await this . connection . Stop ( ) ;
427
421
await this . ShutdownMessageQueue ( ) ;
428
422
}
423
+ LogTrace ( "WebSocket closed" ) ;
429
424
}
430
425
431
426
private async Task ShutdownMessageQueue ( )
432
427
{
433
- // Attempt to wait for the channel to empty before marking the
434
- // writer as complete and waiting for the monitor task to end.
435
428
// THe WebSockect connection is always closed before this method
436
429
// is called, so there will eventually be no more data written
437
- // into the message queue, so this loop should be guaranteed to
438
- // complete.
439
- while ( this . messageQueue . Reader . TryPeek ( out _ ) )
430
+ // into the message queue, meaning this loop should be guaranteed
431
+ // to complete.
432
+ while ( this . connection . IsActive )
440
433
{
441
434
await Task . Delay ( TimeSpan . FromMilliseconds ( 10 ) ) ;
442
435
}
443
436
444
- this . messageQueue . Writer . Complete ( ) ;
437
+ this . messageQueue . CompleteAdding ( ) ;
445
438
await this . messageQueueMonitorTask ;
446
439
}
447
440
441
+ private void MonitorMessageQueue ( )
442
+ {
443
+ // Loop until the BlockingCollection is marked as completed for adding
444
+ // (meaning no more information will be added into the collection, and
445
+ // it is empty (meaning all items in the collection have been processed).
446
+ while ( ! this . messageQueue . IsCompleted )
447
+ {
448
+ try
449
+ {
450
+ // The Take() method blocks until there is something to
451
+ // remove from the BlockingCollection.
452
+ this . ProcessMessage ( this . messageQueue . Take ( ) ) ;
453
+ }
454
+ catch ( InvalidOperationException )
455
+ {
456
+ // InvalidOperationException is normal when the collection
457
+ // is marked as completed while being blocked by the Take()
458
+ // method.
459
+ }
460
+ }
461
+ }
462
+
448
463
private void ProcessMessage ( string message )
449
464
{
450
465
var messageObject = JObject . Parse ( message ) ;
@@ -498,25 +513,6 @@ private void ProcessMessage(string message)
498
513
LogTrace ( "Recieved Other: {0}" , message ) ;
499
514
}
500
515
501
- /// <summary>
502
- /// Reads incoming messages in the message queue.
503
- /// </summary>
504
- private void ReadIncomingMessages ( )
505
- {
506
- while ( this . messageQueue . Reader . TryRead ( out string message ) )
507
- {
508
- this . ProcessMessage ( message ) ;
509
- }
510
- }
511
-
512
- private async Task MonitorMessageQueue ( )
513
- {
514
- while ( await this . messageQueue . Reader . WaitToReadAsync ( ) )
515
- {
516
- this . ReadIncomingMessages ( ) ;
517
- }
518
- }
519
-
520
516
private void OnDevToolsEventReceived ( DevToolsEventReceivedEventArgs e )
521
517
{
522
518
if ( DevToolsEventReceived != null )
@@ -527,7 +523,7 @@ private void OnDevToolsEventReceived(DevToolsEventReceivedEventArgs e)
527
523
528
524
private void OnConnectionDataReceived ( object sender , WebSocketConnectionDataReceivedEventArgs e )
529
525
{
530
- _ = this . messageQueue . Writer . TryWrite ( e . Data ) ;
526
+ this . messageQueue . Add ( e . Data ) ;
531
527
}
532
528
533
529
private void LogTrace ( string message , params object [ ] args )
0 commit comments