@@ -15,7 +15,7 @@ Future<void> voidFuture() async {}
15
15
const MAX_REQUEST_N_SIZE = 0x7FFFFFFF ;
16
16
17
17
abstract class Subscriber {
18
- void onNext (Payload value);
18
+ void onNext (Payload ? value);
19
19
20
20
void onError (dynamic error);
21
21
@@ -24,12 +24,12 @@ abstract class Subscriber {
24
24
25
25
class CompleterSubscriber implements Subscriber {
26
26
Completer completer;
27
- Payload payload;
27
+ Payload ? payload;
28
28
29
29
CompleterSubscriber (this .completer);
30
30
31
31
@override
32
- void onNext (Payload payload) {
32
+ void onNext (Payload ? payload) {
33
33
this .payload = payload;
34
34
}
35
35
@@ -47,11 +47,11 @@ class CompleterSubscriber implements Subscriber {
47
47
class StreamSubscriber implements Subscriber {
48
48
final StreamController controller;
49
49
50
- StreamSubscriber ({FutureOr <void > onCancel () = null })
50
+ StreamSubscriber ({FutureOr <void > onCancel ()? = null })
51
51
: controller = StreamController (onCancel: onCancel);
52
52
53
53
@override
54
- void onNext (Payload value) {
54
+ void onNext (Payload ? value) {
55
55
controller.add (value);
56
56
}
57
57
@@ -65,26 +65,26 @@ class StreamSubscriber implements Subscriber {
65
65
controller.close ().then ((value) => {});
66
66
}
67
67
68
- Stream <Payload > payloadStream () {
69
- return controller.stream.map ((item) => item as Payload );
68
+ Stream <Payload ? > payloadStream () {
69
+ return controller.stream.map ((item) => item as Payload ? );
70
70
}
71
71
}
72
72
73
73
class RSocketRequester extends RSocket {
74
74
bool closed = false ;
75
75
double _availability = 1.0 ;
76
- Timer keepAliveTimer;
77
- StreamIdSupplier streamIdSupplier;
78
- ConnectionSetupPayload connectionSetupPayload;
79
- DuplexConnection connection;
76
+ Timer ? keepAliveTimer;
77
+ late StreamIdSupplier streamIdSupplier;
78
+ ConnectionSetupPayload ? connectionSetupPayload;
79
+ late DuplexConnection connection;
80
80
81
81
//buffer for data chunk
82
- List <int > chunkBuffer;
82
+ List <int >? chunkBuffer;
83
83
84
84
Map <int , Subscriber > senders = {};
85
- RSocket responder;
85
+ RSocket ? responder;
86
86
String mode = 'requester' ;
87
- ErrorConsumer errorConsumer;
87
+ ErrorConsumer ? errorConsumer;
88
88
89
89
RSocketRequester (String mode, ConnectionSetupPayload connectionSetupPayload,
90
90
DuplexConnection connection) {
@@ -109,23 +109,23 @@ class RSocketRequester extends RSocket {
109
109
//RSocket requestResponse
110
110
requestResponse = (payload) {
111
111
var completer = Completer <Payload >();
112
- var streamId = streamIdSupplier.nextStreamId (senders);
112
+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
113
113
connection
114
- .write (FrameCodec .encodeRequestResponseFrame (streamId, payload));
114
+ .write (FrameCodec .encodeRequestResponseFrame (streamId, payload! ));
115
115
senders[streamId] = CompleterSubscriber (completer);
116
116
return completer.future;
117
117
};
118
118
//RSocket fireAndForget
119
119
fireAndForget = (payload) {
120
- var streamId = streamIdSupplier.nextStreamId (senders);
121
- connection.write (FrameCodec .encodeFireAndForgetFrame (streamId, payload));
120
+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
121
+ connection.write (FrameCodec .encodeFireAndForgetFrame (streamId, payload! ));
122
122
return Future .value (() {});
123
123
};
124
124
//RSocket requestStream
125
125
requestStream = (payload) {
126
- var streamId = streamIdSupplier.nextStreamId (senders);
126
+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
127
127
connection.write (FrameCodec .encodeRequestStreamFrame (
128
- streamId, MAX_REQUEST_N_SIZE , payload));
128
+ streamId, MAX_REQUEST_N_SIZE , payload! ));
129
129
var streamSubscriber = StreamSubscriber (onCancel: () {
130
130
connection.write (FrameCodec .encodeCancelFrame (streamId));
131
131
senders.remove (streamId);
@@ -135,7 +135,7 @@ class RSocketRequester extends RSocket {
135
135
};
136
136
//RSocket metadataPush
137
137
metadataPush = (payload) {
138
- connection.write (FrameCodec .encodeMetadataFrame (0 , payload));
138
+ connection.write (FrameCodec .encodeMetadataFrame (0 , payload! ));
139
139
return Future .value (() {});
140
140
};
141
141
//Rsocket Channel
@@ -153,7 +153,7 @@ class RSocketRequester extends RSocket {
153
153
connection.write (setupPayloadFrame ());
154
154
if (mode == 'requester' ) {
155
155
keepAliveTimer = Timer .periodic (
156
- Duration (seconds: connectionSetupPayload.keepAliveInterval),
156
+ Duration (seconds: connectionSetupPayload! .keepAliveInterval),
157
157
(Timer t) {
158
158
if (! closed) {
159
159
connection.write (FrameCodec .encodeKeepAlive (false , 0 ));
@@ -181,12 +181,12 @@ class RSocketRequester extends RSocket {
181
181
182
182
void receiveChunk (Uint8List chunk) {
183
183
if (this .chunkBuffer != null ) {
184
- this .chunkBuffer = this .chunkBuffer + chunk;
185
- var chunkDataLength = this .chunkBuffer.length - 3 ;
186
- var bytes = this .chunkBuffer.sublist (0 , 3 );
187
- var rsocketFrameLength = bytesToNumber (bytes);
184
+ this .chunkBuffer = this .chunkBuffer! + chunk;
185
+ var chunkDataLength = this .chunkBuffer! .length - 3 ;
186
+ var bytes = this .chunkBuffer! .sublist (0 , 3 );
187
+ var rsocketFrameLength = bytesToNumber (bytes)! ;
188
188
if (rsocketFrameLength <= chunkDataLength) {
189
- for (var frame in parseFrames (this .chunkBuffer)) {
189
+ for (var frame in parseFrames (this .chunkBuffer! )) {
190
190
receiveFrame (frame);
191
191
}
192
192
this .chunkBuffer = null ;
@@ -196,7 +196,7 @@ class RSocketRequester extends RSocket {
196
196
if (chunk.length > 3 ) {
197
197
var chunkDataLength = chunk.length - 3 ;
198
198
var bytes = chunk.sublist (0 , 3 );
199
- var rsocketFrameLength = bytesToNumber (bytes);
199
+ var rsocketFrameLength = bytesToNumber (bytes)! ;
200
200
if (rsocketFrameLength > chunkDataLength) {
201
201
this .chunkBuffer = chunk;
202
202
return ;
@@ -219,12 +219,12 @@ class RSocketRequester extends RSocket {
219
219
if (payloadFrame.completed) {
220
220
senders.remove (streamId);
221
221
if (payload? .data != null ) {
222
- subscriber.onNext (payload);
222
+ subscriber! .onNext (payload);
223
223
}
224
- subscriber.onComplete ();
224
+ subscriber! .onComplete ();
225
225
} else {
226
226
if (payload? .data != null ) {
227
- subscriber.onNext (payload);
227
+ subscriber! .onNext (payload);
228
228
}
229
229
}
230
230
}
@@ -241,10 +241,10 @@ class RSocketRequester extends RSocket {
241
241
var streamId = header.streamId;
242
242
var error = RSocketException (errorFrame.code, errorFrame.message);
243
243
if (streamId == 0 && errorConsumer != null ) {
244
- errorConsumer (error);
244
+ errorConsumer ! (error);
245
245
} else {
246
246
if (senders.containsKey (streamId)) {
247
- var subscriber = senders[streamId];
247
+ var subscriber = senders[streamId]! ;
248
248
senders.remove (streamId);
249
249
subscriber.onError (error);
250
250
}
@@ -261,37 +261,37 @@ class RSocketRequester extends RSocket {
261
261
case frame_types.REQUEST_RESPONSE :
262
262
var requestResponseFrame = frame as RequestResponseFrame ;
263
263
if (responder != null && requestResponseFrame.payload != null ) {
264
- responder
265
- .requestResponse (requestResponseFrame.payload)
264
+ responder! .requestResponse !(requestResponseFrame.payload)
266
265
.then ((payload) {
267
266
connection.write (
268
267
FrameCodec .encodePayloadFrame (header.streamId, true , payload));
269
268
}).catchError ((error) {
270
269
var rsocketError = convertToRSocketException (error);
271
270
connection.write (FrameCodec .encodeErrorFrame (
272
- header.streamId, rsocketError.code, rsocketError.message));
271
+ header.streamId, rsocketError.code! , rsocketError.message));
273
272
});
274
273
}
275
274
break ;
276
275
case frame_types.REQUEST_FNF :
277
276
var fireAndForgetFrame = frame as RequestFNFFrame ;
278
277
if (responder != null && fireAndForgetFrame.payload != null ) {
279
- responder
280
- .fireAndForget (fireAndForgetFrame.payload)
278
+ responder! .fireAndForget !(fireAndForgetFrame.payload)
281
279
.then ((value) => {});
282
280
}
283
281
break ;
284
282
case frame_types.METADATA_PUSH :
285
283
var metadataPushFrame = frame as MetadataPushFrame ;
286
284
if (responder != null && metadataPushFrame.payload != null ) {
287
- responder.metadataPush (metadataPushFrame.payload).then ((value) => {});
285
+ responder! .metadataPush !(metadataPushFrame.payload)
286
+ .then ((value) => {});
288
287
}
289
288
break ;
290
289
case frame_types.REQUEST_STREAM :
291
290
var requestStreamFrame = frame as RequestStreamFrame ;
292
291
var requesterStreamId = header.streamId;
293
292
if (responder != null && requestStreamFrame.payload != null ) {
294
- responder.requestStream (requestStreamFrame.payload).listen ((payload) {
293
+ responder! .requestStream !(requestStreamFrame.payload).listen (
294
+ (payload) {
295
295
connection.write (FrameCodec .encodePayloadFrame (
296
296
requesterStreamId, false , payload));
297
297
}, onDone: () {
@@ -301,7 +301,7 @@ class RSocketRequester extends RSocket {
301
301
if (error is RSocketException ) {
302
302
var e = error;
303
303
connection.write (FrameCodec .encodeErrorFrame (
304
- requesterStreamId, e.code, e.message));
304
+ requesterStreamId, e.code! , e.message));
305
305
} else {
306
306
connection.write (FrameCodec .encodeErrorFrame (requesterStreamId,
307
307
RSocketErrorCode .APPLICATION_ERROR , error.toString ()));
@@ -315,10 +315,10 @@ class RSocketRequester extends RSocket {
315
315
316
316
Uint8List setupPayloadFrame () {
317
317
return FrameCodec .encodeSetupFrame (
318
- connectionSetupPayload.keepAliveInterval,
319
- connectionSetupPayload.keepAliveMaxLifetime,
320
- connectionSetupPayload.metadataMimeType,
321
- connectionSetupPayload.dataMimeType,
318
+ connectionSetupPayload! .keepAliveInterval,
319
+ connectionSetupPayload! .keepAliveMaxLifetime,
320
+ connectionSetupPayload! .metadataMimeType,
321
+ connectionSetupPayload! .dataMimeType,
322
322
connectionSetupPayload);
323
323
}
324
324
}
0 commit comments