Skip to content

Commit 97864d3

Browse files
committed
Abstract packetReader to packetParser
+Sacrifice tcp connection string building speed for code simplification and readability
1 parent 57ac0e3 commit 97864d3

File tree

3 files changed

+95
-171
lines changed

3 files changed

+95
-171
lines changed

protocol/packet.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package protocol
33
import (
44
"encoding/binary"
55
"hash/crc32"
6+
"log"
7+
"strconv"
8+
"time"
69
)
710

811
// Protocol commands.
@@ -36,3 +39,92 @@ func (p Packet) serialize() []byte {
3639
func (p Packet) calcCrc() uint32 {
3740
return crc32.ChecksumIEEE(p.serialize()[:len(p.payload)+2])
3841
}
42+
43+
// Parse RX buffer for legitimate packets.
44+
func (t *protocolTransport) packetParser(packetHandler func(*Packet), onTimeout func()) {
45+
defer t.session.Done()
46+
timeouts := 0
47+
PACKET_RX_LOOP:
48+
for {
49+
if timeouts >= 5 {
50+
if t.state == Connected {
51+
log.Println("RX packet timeout")
52+
t.txBuff <- Packet{command: disconnect}
53+
if onTimeout != nil {
54+
onTimeout()
55+
}
56+
return
57+
}
58+
timeouts = 0
59+
}
60+
61+
p := Packet{}
62+
var ok bool
63+
64+
// Length byte
65+
p.length, ok = <-t.rxBuff
66+
if !ok {
67+
return
68+
}
69+
70+
// Command byte
71+
select {
72+
case p.command, ok = <-t.rxBuff:
73+
if !ok {
74+
return
75+
}
76+
case <-time.After(time.Millisecond * 100):
77+
timeouts++
78+
continue PACKET_RX_LOOP // discard
79+
}
80+
81+
// Payload
82+
for i := 0; i < int(p.length)-5; i++ {
83+
select {
84+
case payloadByte, ok := <-t.rxBuff:
85+
if !ok {
86+
return
87+
}
88+
p.payload = append(p.payload, payloadByte)
89+
case <-time.After(time.Millisecond * 100):
90+
timeouts++
91+
continue PACKET_RX_LOOP
92+
}
93+
}
94+
95+
// CRC32
96+
rxCrc := make([]byte, 0, 4)
97+
for i := 0; i < 4; i++ {
98+
select {
99+
case crcByte, ok := <-t.rxBuff:
100+
if !ok {
101+
return
102+
}
103+
rxCrc = append(rxCrc, crcByte)
104+
case <-time.After(time.Millisecond * 100):
105+
timeouts++
106+
continue PACKET_RX_LOOP
107+
}
108+
}
109+
p.crc = binary.LittleEndian.Uint32(rxCrc)
110+
111+
// Integrity Checking
112+
if p.calcCrc() != p.crc {
113+
log.Println("RX packet CRCFAIL")
114+
timeouts++
115+
continue PACKET_RX_LOOP
116+
}
117+
timeouts = 0
118+
packetHandler(&p)
119+
}
120+
}
121+
122+
func makeTCPConnString(connPayload []byte) string {
123+
port := binary.LittleEndian.Uint16(connPayload[4:])
124+
connString := ""
125+
for i := 0; i < 3; i++ {
126+
connString += strconv.Itoa(int(connPayload[i])) + "."
127+
}
128+
connString += strconv.Itoa(int(connPayload[3])) + ":" + strconv.Itoa(int(port))
129+
return connString
130+
}

protocol/protocolClient.go

Lines changed: 1 addition & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package protocol
22

33
import (
44
"bytes"
5-
"encoding/binary"
65
"log"
76
"time"
87
)
@@ -32,7 +31,7 @@ func (c *client) Connect(IP *[4]byte, port uint16) int {
3231

3332
c.session.Add(3)
3433
go c.rxSerial(nil)
35-
go c.packetReader()
34+
go c.packetParser(c.handleRxPacket, c.Stop)
3635
go c.txSerial(nil)
3736
//c.session.Wait()
3837

@@ -110,83 +109,6 @@ func (c *client) Stop() {
110109
c.state = Disconnected
111110
}
112111

113-
// Parse RX buffer for legitimate packets.
114-
func (c *client) packetReader() {
115-
defer c.session.Done()
116-
timeouts := 0
117-
PACKET_RX_LOOP:
118-
for {
119-
if timeouts >= 5 {
120-
if c.state == Connected {
121-
log.Println("COM timeout. Disconnecting from server")
122-
c.txBuff <- Packet{command: disconnect}
123-
c.Stop()
124-
return
125-
}
126-
timeouts = 0
127-
}
128-
129-
p := Packet{}
130-
var ok bool
131-
132-
// Length byte
133-
p.length, ok = <-c.rxBuff
134-
if !ok {
135-
return
136-
}
137-
138-
// Command byte
139-
select {
140-
case p.command, ok = <-c.rxBuff:
141-
if !ok {
142-
return
143-
}
144-
case <-time.After(time.Millisecond * 100):
145-
timeouts++
146-
continue PACKET_RX_LOOP // discard
147-
}
148-
149-
// Payload
150-
for i := 0; i < int(p.length)-5; i++ {
151-
select {
152-
case payloadByte, ok := <-c.rxBuff:
153-
if !ok {
154-
return
155-
}
156-
p.payload = append(p.payload, payloadByte)
157-
case <-time.After(time.Millisecond * 100):
158-
timeouts++
159-
continue PACKET_RX_LOOP
160-
}
161-
}
162-
163-
// CRC32
164-
rxCrc := make([]byte, 0, 4)
165-
for i := 0; i < 4; i++ {
166-
select {
167-
case crcByte, ok := <-c.rxBuff:
168-
if !ok {
169-
return
170-
}
171-
rxCrc = append(rxCrc, crcByte)
172-
case <-time.After(time.Millisecond * 100):
173-
timeouts++
174-
continue PACKET_RX_LOOP
175-
}
176-
}
177-
p.crc = binary.LittleEndian.Uint32(rxCrc)
178-
179-
// Integrity Checking
180-
if p.calcCrc() != p.crc {
181-
log.Println("Client packet RX CRCFAIL")
182-
timeouts++
183-
continue PACKET_RX_LOOP
184-
}
185-
timeouts = 0
186-
c.handleRxPacket(&p)
187-
}
188-
}
189-
190112
// Packet RX done. Handle it.
191113
func (c *client) handleRxPacket(packet *Packet) {
192114
var rxSeqFlag bool = (packet.command & 0x80) > 0

protocol/protocolGateway.go

Lines changed: 2 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package protocol
22

33
import (
4-
"bytes"
5-
"encoding/binary"
64
"log"
75
"net"
8-
"strconv"
96
"time"
107
)
118

@@ -24,87 +21,10 @@ func (g *gateway) Listen(ds serialInterface) {
2421

2522
g.session.Add(2)
2623
go g.rxSerial(g.dropGateway)
27-
go g.packetReader()
24+
go g.packetParser(g.handleRxPacket, g.dropLink)
2825
g.session.Wait()
2926
}
3027

31-
// Parse RX buffer for legitimate packets.
32-
func (g *gateway) packetReader() {
33-
defer g.session.Done()
34-
timeouts := 0
35-
PACKET_RX_LOOP:
36-
for {
37-
if timeouts >= 5 {
38-
if g.state == Connected {
39-
log.Println("Downstream RX timeout. Disconnecting client")
40-
g.txBuff <- Packet{command: disconnect}
41-
g.dropLink()
42-
return
43-
}
44-
timeouts = 0
45-
}
46-
47-
p := Packet{}
48-
var ok bool
49-
50-
// Length byte
51-
p.length, ok = <-g.rxBuff
52-
if !ok {
53-
return
54-
}
55-
56-
// Command byte
57-
select {
58-
case p.command, ok = <-g.rxBuff:
59-
if !ok {
60-
return
61-
}
62-
case <-time.After(time.Millisecond * 100):
63-
timeouts++
64-
continue PACKET_RX_LOOP // discard
65-
}
66-
67-
// Payload
68-
for i := 0; i < int(p.length)-5; i++ {
69-
select {
70-
case payloadByte, ok := <-g.rxBuff:
71-
if !ok {
72-
return
73-
}
74-
p.payload = append(p.payload, payloadByte)
75-
case <-time.After(time.Millisecond * 100):
76-
timeouts++
77-
continue PACKET_RX_LOOP
78-
}
79-
}
80-
81-
// CRC32
82-
rxCrc := make([]byte, 0, 4)
83-
for i := 0; i < 4; i++ {
84-
select {
85-
case crcByte, ok := <-g.rxBuff:
86-
if !ok {
87-
return
88-
}
89-
rxCrc = append(rxCrc, crcByte)
90-
case <-time.After(time.Millisecond * 100):
91-
timeouts++
92-
continue PACKET_RX_LOOP
93-
}
94-
}
95-
p.crc = binary.LittleEndian.Uint32(rxCrc)
96-
97-
// Integrity Checking
98-
if p.calcCrc() != p.crc {
99-
log.Println("Downstream packet RX CRCFAIL")
100-
timeouts++
101-
continue PACKET_RX_LOOP
102-
}
103-
timeouts = 0
104-
g.handleRxPacket(&p)
105-
}
106-
}
107-
10828
// Packet RX done. Handle it.
10929
func (g *gateway) handleRxPacket(packet *Packet) {
11030
var rxSeqFlag bool = (packet.command & 0x80) > 0
@@ -135,17 +55,7 @@ func (g *gateway) handleRxPacket(packet *Packet) {
13555
return
13656
}
13757

138-
port := binary.LittleEndian.Uint16(packet.payload[4:])
139-
140-
var dst bytes.Buffer
141-
for i := 0; i < 3; i++ {
142-
dst.WriteString(strconv.Itoa(int(packet.payload[i])))
143-
dst.WriteByte('.')
144-
}
145-
dst.WriteString(strconv.Itoa(int(packet.payload[3])))
146-
dst.WriteByte(':')
147-
dst.WriteString(strconv.Itoa(int(port)))
148-
dstStr := dst.String()
58+
dstStr := makeTCPConnString(packet.payload)
14959

15060
g.txBuff = make(chan Packet, 2)
15161
g.expectedRxSeqFlag = false

0 commit comments

Comments
 (0)