File tree Expand file tree Collapse file tree 3 files changed +19
-3
lines changed Expand file tree Collapse file tree 3 files changed +19
-3
lines changed Original file line number Diff line number Diff line change @@ -45,7 +45,10 @@ class CompleterSubscriber implements Subscriber {
45
45
}
46
46
47
47
class StreamSubscriber implements Subscriber {
48
- StreamController controller = StreamController ();
48
+ final StreamController controller;
49
+
50
+ StreamSubscriber ({FutureOr <void > onCancel () = null })
51
+ : controller = StreamController (onCancel: onCancel);
49
52
50
53
@override
51
54
void onNext (Payload value) {
@@ -123,7 +126,10 @@ class RSocketRequester extends RSocket {
123
126
var streamId = streamIdSupplier.nextStreamId (senders);
124
127
connection.write (FrameCodec .encodeRequestStreamFrame (
125
128
streamId, MAX_REQUEST_N_SIZE , payload));
126
- var streamSubscriber = StreamSubscriber ();
129
+ var streamSubscriber = StreamSubscriber (onCancel: () {
130
+ connection.write (FrameCodec .encodeCancelFrame (streamId));
131
+ senders.remove (streamId);
132
+ });
127
133
senders[streamId] = streamSubscriber;
128
134
return streamSubscriber.payloadStream ();
129
135
};
Original file line number Diff line number Diff line change @@ -472,6 +472,16 @@ class FrameCodec {
472
472
refillFrameLength (frameBuffer);
473
473
return frameBuffer.toUint8Array ();
474
474
}
475
+
476
+ static Uint8List encodeCancelFrame (int streamId) {
477
+ var frameBuffer = RSocketByteBuffer ();
478
+ frameBuffer.writeI24 (0 ); // frame length
479
+ frameBuffer.writeI32 (streamId); //stream id
480
+ frameBuffer.writeI8 (frame_types.CANCEL << 2 );
481
+ frameBuffer.writeI8 (0 );
482
+ refillFrameLength (frameBuffer);
483
+ return frameBuffer.toUint8Array ();
484
+ }
475
485
}
476
486
477
487
Payload decodePayload (
Original file line number Diff line number Diff line change @@ -7,6 +7,6 @@ issue_tracker: https://github.com/rsocket/rsocket-dart/issues
7
7
environment :
8
8
sdk : ' >=2.7.0 <3.0.0'
9
9
dependencies :
10
- rxdart : ^0.27.0
10
+ rxdart : ^0.27.1
11
11
dev_dependencies :
12
12
test : ^1.6.0
You can’t perform that action at this time.
0 commit comments