88"time"
99)
1010
11+ // Protocol Commands
1112const (
1213connect = iota
1314connack
@@ -58,6 +59,7 @@ PACKET_RX_LOOP:
5859return
5960}
6061case <- time .After (time .Millisecond * 100 ):
62+ log .Println ("<<<Packet in from COM TIMEOUT" )
6163continue PACKET_RX_LOOP // discard
6264}
6365
@@ -99,11 +101,11 @@ PACKET_RX_LOOP:
99101continue PACKET_RX_LOOP
100102}
101103
102- // Packet receive done. Process it.
103104com .handleRxPacket (& p )
104105}
105106}
106107
108+ // Packet RX done. Handle it.
107109func (com * comHandler ) handleRxPacket (packet * Packet ) {
108110rxSeqFlag := (packet .command & 0x80 ) > 0
109111switch packet .command & 0x7F {
@@ -117,53 +119,73 @@ func (com *comHandler) handleRxPacket(packet *Packet) {
117119}
118120}
119121case acknowledge :
120- com .acknowledgeEvent <- rxSeqFlag // TODO: if not connected this will block forever
122+ if com .state == connected {
123+ com .acknowledgeEvent <- rxSeqFlag
124+ }
121125case connect :
122- log .Println ("got CONNECT PACKET" )
123126if com .state != disconnected {
124127return
125128}
126129if len (packet .payload ) != 6 {
127130return
128131}
132+
129133port := binary .LittleEndian .Uint16 (packet .payload [4 :])
130- destination := strconv .Itoa (int (packet .payload [0 ])) + "." + strconv .Itoa (int (packet .payload [1 ])) + "." + strconv .Itoa (int (packet .payload [2 ])) + "." + strconv .Itoa (int (packet .payload [3 ])) + ":" + strconv .Itoa (int (port ))
134+
135+ destination := strconv .Itoa (int (packet .payload [0 ])) + "."
136+ destination += strconv .Itoa (int (packet .payload [1 ])) + "."
137+ destination += strconv .Itoa (int (packet .payload [2 ])) + "."
138+ destination += strconv .Itoa (int (packet .payload [3 ])) + ":"
139+ destination += strconv .Itoa (int (port ))
140+
131141log .Printf ("Dialing to: %v" , destination )
132142if err := com .dialTCP (destination ); err != nil { // TODO: add timeout
143+ log .Printf ("Failed to connect to: %v" , destination )
133144com .txBuffer <- Packet {command : disconnect } // TODO: payload to contain error or timeout
134145return
135146}
147+ log .Printf ("Connected" )
136148com .startEvent <- true
137149com .state = connected
138150com .txBuffer <- Packet {command : connack }
139151}
140152}
141153
142- // Publish packet received from a channel.
143- // Will block for second publish, until ack is received for first.
154+ // Publish data received from upstream tcp server.
155+ // We need to get an Ack before sending the next publish packet.
156+ // Resend same publish packet after timeout, and kill link after 5 retries.
144157func (com * comHandler ) packetSender () {
145158sequenceTxFlag := false
159+ retries := 0
146160tx := make ([]byte , 512 )
147161for {
148162nRx , err := com .tcpLink .Read (tx )
149163if err != nil {
150164log .Fatal ("Error Receiving from TCP" )
151165}
152- log .Println (">>>Packet out to COM START" )
153- log .Println ("------------------->>>>>>>" )
154166p := Packet {command : publish , payload : tx [:nRx ]}
155167if sequenceTxFlag {
156168p .command |= 0x80
157169}
170+ PUB_LOOP:
158171for {
159172com .txBuffer <- p
160- ack := <- com .acknowledgeEvent
161- if ack == sequenceTxFlag {
162- sequenceTxFlag = ! sequenceTxFlag
163- break
173+ select {
174+ case ack := <- com .acknowledgeEvent :
175+ retries = 0
176+ if ack == sequenceTxFlag {
177+ sequenceTxFlag = ! sequenceTxFlag
178+ break PUB_LOOP // success
179+ }
180+ case <- time .After (time .Millisecond * 500 ):
181+ retries ++
182+ if retries >= 5 {
183+ com .txBuffer <- Packet {command : disconnect }
184+ com .state = disconnected
185+ log .Println ("Too many send retries. Client disconnected" )
186+ return
187+ }
164188}
165- log .Println (">>>RETRY out to COM" )
166189}
167- log .Println (">>>Packet out to COM DONE" )
168190}
169191}
0 commit comments