@@ -5,13 +5,92 @@ use crate::result::RSocketResult;
5
5
use bytes:: { Buf , BufMut , Bytes , BytesMut } ;
6
6
7
7
const MAX_MIME_LEN : usize = 0x7F ;
8
+ const MAX_ROUTING_TAG_LEN : usize = 0xFF ;
8
9
9
10
#[ derive( Debug , Clone , Eq , PartialEq ) ]
10
11
pub struct CompositeMetadata {
11
12
mime : String ,
12
13
payload : Bytes ,
13
14
}
14
15
16
+ #[ derive( Debug , Clone ) ]
17
+ pub struct RoutingMetadata {
18
+ tags : Vec < String > ,
19
+ }
20
+
21
+ pub struct RoutingMetadataBuilder {
22
+ inner : RoutingMetadata ,
23
+ }
24
+
25
+ impl RoutingMetadataBuilder {
26
+ pub fn push_str ( self , tag : & str ) -> Self {
27
+ self . push ( String :: from ( tag) )
28
+ }
29
+
30
+ pub fn push ( mut self , tag : String ) -> Self {
31
+ if tag. len ( ) > MAX_ROUTING_TAG_LEN {
32
+ panic ! ( "exceeded maximum routing tag length!" ) ;
33
+ }
34
+ self . inner . tags . push ( tag) ;
35
+ self
36
+ }
37
+ pub fn build ( self ) -> RoutingMetadata {
38
+ self . inner
39
+ }
40
+ }
41
+
42
+ impl RoutingMetadata {
43
+ pub fn builder ( ) -> RoutingMetadataBuilder {
44
+ RoutingMetadataBuilder {
45
+ inner : RoutingMetadata { tags : vec ! [ ] } ,
46
+ }
47
+ }
48
+
49
+ pub fn decode ( bf : & mut BytesMut ) -> RSocketResult < RoutingMetadata > {
50
+ let mut bu = RoutingMetadata :: builder ( ) ;
51
+ loop {
52
+ match Self :: decode_once ( bf) {
53
+ Ok ( v) => match v {
54
+ Some ( tag) => bu = bu. push ( tag) ,
55
+ None => break ,
56
+ } ,
57
+ Err ( e) => return Err ( e) ,
58
+ }
59
+ }
60
+ Ok ( bu. build ( ) )
61
+ }
62
+
63
+ fn decode_once ( bf : & mut BytesMut ) -> RSocketResult < Option < String > > {
64
+ if bf. is_empty ( ) {
65
+ return Ok ( None ) ;
66
+ }
67
+ let size = bf. get_u8 ( ) as usize ;
68
+ if bf. len ( ) < size {
69
+ return Err ( RSocketError :: from ( "require more bytes!" ) ) ;
70
+ }
71
+ let tag = String :: from_utf8 ( bf. split_to ( size) . to_vec ( ) ) . unwrap ( ) ;
72
+ Ok ( Some ( tag) )
73
+ }
74
+
75
+ pub fn get_tags ( & self ) -> & Vec < String > {
76
+ & self . tags
77
+ }
78
+
79
+ pub fn write_to ( & self , bf : & mut BytesMut ) {
80
+ for tag in & self . tags {
81
+ let size = tag. len ( ) as u8 ;
82
+ bf. put_u8 ( size) ;
83
+ bf. put_slice ( tag. as_bytes ( ) ) ;
84
+ }
85
+ }
86
+
87
+ pub fn bytes ( & self ) -> Bytes {
88
+ let mut bf = BytesMut :: new ( ) ;
89
+ self . write_to ( & mut bf) ;
90
+ bf. freeze ( )
91
+ }
92
+ }
93
+
15
94
impl CompositeMetadata {
16
95
pub fn new ( mime : String , payload : Bytes ) -> CompositeMetadata {
17
96
if mime. len ( ) > MAX_MIME_LEN {
@@ -50,24 +129,18 @@ impl CompositeMetadata {
50
129
// Bad
51
130
let mime_len = first as usize ;
52
131
if bs. len ( ) < mime_len {
53
- return Err ( RSocketError :: from ( ErrorKind :: WithDescription (
54
- "bad COMPOSITE_METADATA bytes: missing required bytes!" ,
55
- ) ) ) ;
132
+ return Err ( RSocketError :: from ( "broken COMPOSITE_METADATA bytes!" ) ) ;
56
133
}
57
134
let front = bs. split_to ( mime_len) ;
58
135
String :: from_utf8 ( front. to_vec ( ) ) . unwrap ( )
59
136
} ;
60
137
61
138
if bs. len ( ) < 3 {
62
- return Err ( RSocketError :: from ( ErrorKind :: WithDescription (
63
- "bad COMPOSITE_METADATA bytes: missing required bytes!" ,
64
- ) ) ) ;
139
+ return Err ( RSocketError :: from ( "broken COMPOSITE_METADATA bytes!" ) ) ;
65
140
}
66
141
let payload_size = U24 :: read_advance ( bs) as usize ;
67
142
if bs. len ( ) < payload_size {
68
- return Err ( RSocketError :: from ( ErrorKind :: WithDescription (
69
- "bad COMPOSITE_METADATA bytes: missing required bytes!" ,
70
- ) ) ) ;
143
+ return Err ( RSocketError :: from ( "broken COMPOSITE_METADATA bytes!" ) ) ;
71
144
}
72
145
let p = bs. split_to ( payload_size) . freeze ( ) ;
73
146
Ok ( Some ( CompositeMetadata :: new ( m, p) ) )
0 commit comments