File tree Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Original file line number Diff line number Diff line change @@ -45,7 +45,10 @@ class CompleterSubscriber implements Subscriber {
45
45
}
46
46
47
47
class StreamSubscriber implements Subscriber {
48
- StreamController controller = StreamController ();
48
+ final StreamController controller;
49
+
50
+ StreamSubscriber ({FutureOr <void > onCancel () = null })
51
+ : controller = StreamController (onCancel: onCancel);
49
52
50
53
@override
51
54
void onNext (Payload value) {
@@ -123,7 +126,10 @@ class RSocketRequester extends RSocket {
123
126
var streamId = streamIdSupplier.nextStreamId (senders);
124
127
connection.write (FrameCodec .encodeRequestStreamFrame (
125
128
streamId, MAX_REQUEST_N_SIZE , payload));
126
- var streamSubscriber = StreamSubscriber ();
129
+ var streamSubscriber = StreamSubscriber (onCancel: () {
130
+ connection.write (FrameCodec .encodeCancelFrame (streamId));
131
+ senders.remove (streamId);
132
+ });
127
133
senders[streamId] = streamSubscriber;
128
134
return streamSubscriber.payloadStream ();
129
135
};
You can’t perform that action at this time.
0 commit comments